You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/12/07 21:47:07 UTC

[2/2] kudu git commit: block manager: start using the file cache

block manager: start using the file cache

This commit integrates the file cache into both the file and log block
managers. The capacity of the cache is determined at runtime by inspecting
the RLIMIT_NOFILE resource limit; because the cache doesn't manage all open
files, we use a rather conservative 50% of the limit. This can be overridden
with a command line flag, which can also be used to disable file caching
altogether.

Other changes of note:
- Unlike the FBM, the LBM creates and opens container files for read/write
  in the same operation. Since that kind of behavior isn't supported by the
  file cache, we close the files after creating them, then reopen them
  through the cache. While inelegant, I don't expect this to be problematic.
- block_manager-stress-test now uses a PeriodicOpenFdChecker to make sure
  the file cache is working correctly. Some of the test behavior was tweaked
  to increase the number of blocks, and to avoid violating the semantics of
  the file cache.
- BlockManagerTest.CloseManyBlocksTest now uses 1000 blocks, which is enough
  for it to fail with the FBM sans file cache when the process resource
  limit is 1024 open files (the default on my Ubuntu 16.04 installation).
- When constructing a block manager, we first try to increase the
  RLIMIT_NOFILE soft limit for the process to be equal to the hard limit. On
  many systems there's enough of a gap between them that this can add a lot
  of cache capacity "for free".

Change-Id: Ieeefd31eca340111bc535eac1f982290e7703a88
Reviewed-on: http://gerrit.cloudera.org:8080/5147
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3fa9db76
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3fa9db76
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3fa9db76

Branch: refs/heads/master
Commit: 3fa9db761e35210a9e83b0be279bc44c95f6ccbf
Parents: 6a124f0
Author: Adar Dembo <ad...@cloudera.com>
Authored: Thu Nov 17 01:41:41 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Dec 7 21:46:29 2016 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-stress-test.cc | 159 ++++++++++++++++----------
 src/kudu/fs/block_manager-test.cc        |  35 +++---
 src/kudu/fs/block_manager.cc             |  40 +++++++
 src/kudu/fs/block_manager.h              |  13 ++-
 src/kudu/fs/file_block_manager.cc        |  30 ++++-
 src/kudu/fs/file_block_manager.h         |   6 +
 src/kudu/fs/log_block_manager.cc         |  89 ++++++++++----
 src/kudu/fs/log_block_manager.h          |   9 +-
 src/kudu/rpc/rpc-test.cc                 |  10 +-
 src/kudu/util/env.h                      |   9 ++
 src/kudu/util/env_posix.cc               |  26 +++++
 src/kudu/util/pb_util.cc                 |   4 +-
 src/kudu/util/pb_util.h                  |   8 +-
 13 files changed, 317 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index f5c6353..7d591ca 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -15,10 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
 #include <cmath>
-#include <memory>
 #include <mutex>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "kudu/fs/file_block_manager.h"
@@ -27,27 +28,33 @@
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/atomic.h"
+#include "kudu/util/file_cache-test-util.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/random.h"
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
+DECLARE_int64(block_manager_max_open_files);
+DECLARE_uint64(log_container_max_size);
+DECLARE_uint64(log_container_preallocate_bytes);
+DECLARE_bool(never_fsync);
+
 DEFINE_int32(test_duration_secs, 2, "Number of seconds to run the test");
 DEFINE_int32(num_writer_threads, 4, "Number of writer threads to run");
 DEFINE_int32(num_reader_threads, 8, "Number of reader threads to run");
 DEFINE_int32(num_deleter_threads, 1, "Number of deleter threads to run");
 DEFINE_int32(block_group_size, 8, "Number of blocks to write per block "
              "group. Must be power of 2");
-DEFINE_int32(block_group_bytes, 64 * 1024,
+DEFINE_int32(block_group_bytes, 32 * 1024,
              "Total amount of data (in bytes) to write per block group");
-DEFINE_int32(num_bytes_per_write, 64,
+DEFINE_int32(num_bytes_per_write, 32,
              "Number of bytes to write at a time");
 DEFINE_string(block_manager_paths, "", "Comma-separated list of paths to "
               "use for block storage. If empty, will use the default unit "
               "test path");
 
-using std::shared_ptr;
 using std::string;
+using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -59,7 +66,7 @@ namespace fs {
 // writing threads (default 2) that do the following in a tight loop:
 // - create a new group of blocks (default 10)
 // - write a PRNG seed into each block
-// - write a big chunk of data (default 64m) into the block group:
+// - write a big chunk of data (default 32m) into the block group:
 //   - pick the next block to write a piece to at random
 //   - write one piece at a time (default 64k) of data generated using
 //     that block's PRNG seed
@@ -70,22 +77,33 @@ namespace fs {
 // - read the block fully into memory, parsing its seed
 // - verify that the contents of the block match the PRNG output
 // deleting threads (default 1) that do the following every second:
-// - drain the block_id vector(write locked)
+// - drain one group of blocks from the block_id vector(write locked)
 // - delete all the blocks drained from the vector
-//
-// TODO: Don't delete all blocks ala "permgen".
 template <typename T>
 class BlockManagerStressTest : public KuduTest {
  public:
   BlockManagerStressTest() :
     rand_seed_(SeedRandom()),
     stop_latch_(1),
-    bm_(CreateBlockManager()),
     total_blocks_written_(0),
     total_bytes_written_(0),
     total_blocks_read_(0),
     total_bytes_read_(0),
     total_blocks_deleted_(0) {
+
+    // Increases total number of blocks manipulated, which is the right kind
+    // of stress for this test.
+    FLAGS_never_fsync = true;
+
+    // Increase the number of containers created.
+    FLAGS_log_container_max_size = 1 * 1024 * 1024;
+    FLAGS_log_container_preallocate_bytes = 1 * 1024 * 1024;
+
+    // Ensure the file cache is under stress too.
+    FLAGS_block_manager_max_open_files = 512;
+
+    // Defer block manager creation until after the above flags are set.
+    bm_.reset(CreateBlockManager());
   }
 
   virtual void SetUp() OVERRIDE {
@@ -164,6 +182,8 @@ class BlockManagerStressTest : public KuduTest {
   void ReaderThread();
   void DeleterThread();
 
+  int GetMaxFdCount() const;
+
  protected:
   // Used to generate random data. All PRNG instances are seeded with this
   // value to ensure that the test is reproducible.
@@ -173,10 +193,13 @@ class BlockManagerStressTest : public KuduTest {
   CountDownLatch stop_latch_;
 
   // Tracks blocks that have been synced and are ready to be read/deleted.
-  vector<BlockId> written_blocks_;
+  //
+  // Each entry is a block id and the number of in-progress openers. To delete
+  // a block, there must be no openers.
+  unordered_map<BlockId, int, BlockIdHash> written_blocks_;
 
   // Protects written_blocks_.
-  rw_spinlock lock_;
+  simple_spinlock lock_;
 
   // The block manager.
   gscoped_ptr<BlockManager> bm_;
@@ -197,9 +220,6 @@ class BlockManagerStressTest : public KuduTest {
 
 template <typename T>
 void BlockManagerStressTest<T>::WriterThread() {
-  string thread_name = Thread::current_thread()->name();
-  LOG(INFO) << "Thread " << thread_name << " starting";
-
   Random rand(rand_seed_);
   size_t num_blocks_written = 0;
   size_t num_bytes_written = 0;
@@ -216,7 +236,6 @@ void BlockManagerStressTest<T>::WriterThread() {
 
       const uint32_t seed = rand.Next() + 1;
       Slice seed_slice(reinterpret_cast<const uint8_t*>(&seed), sizeof(seed));
-      LOG(INFO) << "Creating block " << block->id().ToString() << " with seed " << seed;
       CHECK_OK(block->Append(seed_slice));
 
       dirty_blocks.push_back(block.release());
@@ -227,7 +246,6 @@ void BlockManagerStressTest<T>::WriterThread() {
     //
     // To emulate a real life workload, we pick the next block to write at
     // random, and write a smaller chunk of data to it.
-    LOG(INFO) << "Writing " << FLAGS_block_group_bytes << " bytes into new blocks";
     size_t total_dirty_bytes = 0;
     while (total_dirty_bytes < FLAGS_block_group_bytes) {
       // Pick the next block.
@@ -249,65 +267,65 @@ void BlockManagerStressTest<T>::WriterThread() {
     //
     // We could close them implicitly when the blocks are destructed but
     // this way we can check for errors.
-    LOG(INFO) << "Closing new blocks";
     CHECK_OK(bm_->CloseBlocks(dirty_blocks));
 
     // Publish the now sync'ed blocks to readers and deleters.
     {
-      std::lock_guard<rw_spinlock> l(lock_);
+      std::lock_guard<simple_spinlock> l(lock_);
       for (WritableBlock* block : dirty_blocks) {
-        written_blocks_.push_back(block->id());
+        InsertOrDie(&written_blocks_, block->id(), 0);
       }
     }
     num_blocks_written += dirty_blocks.size();
     num_bytes_written += total_dirty_bytes;
   }
 
-  LOG(INFO) << Substitute("Thread $0 stopping. Wrote $1 blocks ($2 bytes)",
-                          thread_name, num_blocks_written, num_bytes_written);
   total_blocks_written_.IncrementBy(num_blocks_written);
   total_bytes_written_.IncrementBy(num_bytes_written);
 }
 
 template <typename T>
 void BlockManagerStressTest<T>::ReaderThread() {
-  string thread_name = Thread::current_thread()->name();
-  LOG(INFO) << "Thread " << thread_name << " starting";
-
   Random rand(rand_seed_);
   size_t num_blocks_read = 0;
   size_t num_bytes_read = 0;
   MonoDelta tight_loop(MonoDelta::FromSeconds(0));
   while (!ShouldStop(tight_loop)) {
-    gscoped_ptr<ReadableBlock> block;
+    BlockId block_id;
     {
       // Grab a block at random.
-      shared_lock<rw_spinlock> l(lock_);
-      size_t num_blocks = written_blocks_.size();
-      if (num_blocks > 0) {
-        uint32_t next_id = rand.Uniform(num_blocks);
-        const BlockId& block_id = written_blocks_[next_id];
-        CHECK_OK(bm_->OpenBlock(block_id, &block));
+      std::lock_guard<simple_spinlock> l(lock_);
+      if (written_blocks_.empty()) {
+        continue;
       }
+
+      auto it = written_blocks_.begin();
+      std::advance(it, rand.Uniform(written_blocks_.size()));
+      block_id = it->first;
+      it->second++;
     }
-    if (!block) {
-      continue;
+
+    gscoped_ptr<ReadableBlock> block;
+    CHECK_OK(bm_->OpenBlock(block_id, &block));
+
+    // Done opening the block, make it available for deleting.
+    {
+      std::lock_guard<simple_spinlock> l(lock_);
+      int& openers = FindOrDie(written_blocks_, block_id);
+      openers--;
     }
 
     // Read it fully into memory.
-    string block_id = block->id().ToString();
     uint64_t block_size;
     CHECK_OK(block->Size(&block_size));
     Slice data;
     gscoped_ptr<uint8_t[]> scratch(new uint8_t[block_size]);
     CHECK_OK(block->Read(0, block_size, &data, scratch.get()));
-    LOG(INFO) << "Read " << block_size << " bytes from block " << block_id;
 
     // The first 4 bytes correspond to the PRNG seed.
     CHECK(data.size() >= 4);
     uint32_t seed;
     memcpy(&seed, data.data(), sizeof(uint32_t));
-    LOG(INFO) << "Read seed " << seed << " from block " << block_id;
     Random rand(seed);
 
     // Verify every subsequent number using the PRNG.
@@ -325,45 +343,66 @@ void BlockManagerStressTest<T>::ReaderThread() {
       }
     }
     CHECK_EQ(bytes_processed, data.size());
-    LOG(INFO) << "Finished reading block " << block->id().ToString();
     num_blocks_read++;
     num_bytes_read += block_size;
   }
 
-  LOG(INFO) << Substitute("Thread $0 stopping. Read $1 blocks ($2 bytes)",
-                          thread_name, num_blocks_read, num_bytes_read);
   total_blocks_read_.IncrementBy(num_blocks_read);
   total_bytes_read_.IncrementBy(num_bytes_read);
 }
 
 template <typename T>
 void BlockManagerStressTest<T>::DeleterThread() {
-  string thread_name = Thread::current_thread()->name();
-  LOG(INFO) << "Thread " << thread_name << " starting";
-
+  Random rand(rand_seed_);
   size_t num_blocks_deleted = 0;
-  MonoDelta sleep_time(MonoDelta::FromSeconds(1));
-  while (!ShouldStop(sleep_time)) {
-    // Grab all the blocks we can.
-    vector<BlockId> to_delete;
+  MonoDelta tight_loop(MonoDelta::FromSeconds(0));
+  while (!ShouldStop(tight_loop)) {
+    // Grab a block at random.
+    BlockId to_delete;
     {
-      std::lock_guard<rw_spinlock> l(lock_);
-      to_delete.swap(written_blocks_);
-    }
+      std::lock_guard<simple_spinlock> l(lock_);
+      if (written_blocks_.empty()) {
+        continue;
+      }
+
+      auto it = written_blocks_.begin();
+      std::advance(it, rand.Uniform(written_blocks_.size()));
+      if (it->second > 0) {
+        continue;
+      }
 
-    // And delete them.
-    for (const BlockId& block_id : to_delete) {
-      LOG(INFO) << "Deleting block " << block_id.ToString();
-      CHECK_OK(bm_->DeleteBlock(block_id));
+      to_delete = it->first;
+      written_blocks_.erase(it);
     }
-    num_blocks_deleted += to_delete.size();
+
+    // And delete it.
+    CHECK_OK(bm_->DeleteBlock(to_delete));
+    num_blocks_deleted++;
   }
 
-  LOG(INFO) << Substitute("Thread $0 stopping. Deleted $1 blocks",
-                          thread_name, num_blocks_deleted);
   total_blocks_deleted_.IncrementBy(num_blocks_deleted);
 }
 
+template <>
+int BlockManagerStressTest<FileBlockManager>::GetMaxFdCount() const {
+  return FLAGS_block_manager_max_open_files +
+      // Each open block exists outside the file cache.
+      (FLAGS_num_writer_threads * FLAGS_block_group_size) +
+      // Each reader thread can open a file outside the cache if its lookup
+      // misses. It'll immediately evict an existing fd, but in that brief
+      // window of time both fds may be open simultaneously.
+      FLAGS_num_reader_threads;
+}
+
+template <>
+int BlockManagerStressTest<LogBlockManager>::GetMaxFdCount() const {
+  return FLAGS_block_manager_max_open_files +
+      // If all containers are full, each open block could theoretically
+      // result in a new container, which is two files briefly outside the
+      // cache (before they are inserted and evict other cached files).
+      (FLAGS_num_writer_threads * FLAGS_block_group_size * 2);
+}
+
 // What kinds of BlockManagers are supported?
 #if defined(__linux__)
 typedef ::testing::Types<FileBlockManager, LogBlockManager> BlockManagers;
@@ -375,22 +414,22 @@ TYPED_TEST_CASE(BlockManagerStressTest, BlockManagers);
 TYPED_TEST(BlockManagerStressTest, StressTest) {
   OverrideFlagForSlowTests("test_duration_secs", "30");
   OverrideFlagForSlowTests("block_group_size", "16");
-  OverrideFlagForSlowTests("block_group_bytes",
-                           Substitute("$0", 64 * 1024 * 1024));
-  OverrideFlagForSlowTests("num_bytes_per_write",
-                           Substitute("$0", 64 * 1024));
 
   if ((FLAGS_block_group_size & (FLAGS_block_group_size - 1)) != 0) {
     LOG(FATAL) << "block_group_size " << FLAGS_block_group_size
                << " is not a power of 2";
   }
 
+  PeriodicOpenFdChecker checker(this->env_, this->GetMaxFdCount());
+
   LOG(INFO) << "Running on fresh block manager";
+  checker.Start();
   this->RunTest(FLAGS_test_duration_secs / 2);
   LOG(INFO) << "Running on populated block manager";
   this->bm_.reset(this->CreateBlockManager());
   ASSERT_OK(this->bm_->Open());
   this->RunTest(FLAGS_test_duration_secs / 2);
+  checker.Stop();
 
   LOG(INFO) << "Printing test totals";
   LOG(INFO) << "--------------------";

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 5209156..2e19afd 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -42,12 +42,6 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
-// LogBlockManager opens two files per container, and CloseManyBlocksTest
-// uses one container for each block. To simplify testing (i.e. no need to
-// raise the ulimit on open files), the default is kept low.
-DEFINE_int32(num_blocks_close, 500,
-             "Number of blocks to simultaneously close in CloseManyBlocksTest");
-
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
 
@@ -468,6 +462,8 @@ TYPED_TEST(BlockManagerTest, CloseTwiceTest) {
 }
 
 TYPED_TEST(BlockManagerTest, CloseManyBlocksTest) {
+  const int kNumBlocks = 1000;
+
   if (!AllowSlowTests()) {
     LOG(INFO) << "Not running in slow-tests mode";
     return;
@@ -479,23 +475,24 @@ TYPED_TEST(BlockManagerTest, CloseManyBlocksTest) {
   Random rand(SeedRandom());
   vector<WritableBlock*> dirty_blocks;
   ElementDeleter deleter(&dirty_blocks);
-  LOG(INFO) << "Creating " <<  FLAGS_num_blocks_close << " blocks";
-  for (int i = 0; i < FLAGS_num_blocks_close; i++) {
-    // Create a block.
-    gscoped_ptr<WritableBlock> written_block;
-    ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  LOG_TIMING(INFO, Substitute("creating $0 blocks", kNumBlocks)) {
+    for (int i = 0; i < kNumBlocks; i++) {
+      // Create a block.
+      gscoped_ptr<WritableBlock> written_block;
+      ASSERT_OK(this->bm_->CreateBlock(&written_block));
+
+      // Write 64k bytes of random data into it.
+      uint8_t data[65536];
+      for (int i = 0; i < sizeof(data); i += sizeof(uint32_t)) {
+        data[i] = rand.Next();
+      }
+      written_block->Append(Slice(data, sizeof(data)));
 
-    // Write 64k bytes of random data into it.
-    uint8_t data[65536];
-    for (int i = 0; i < sizeof(data); i += sizeof(uint32_t)) {
-      data[i] = rand.Next();
+      dirty_blocks.push_back(written_block.release());
     }
-    written_block->Append(Slice(data, sizeof(data)));
-
-    dirty_blocks.push_back(written_block.release());
   }
 
-  LOG_TIMING(INFO, Substitute("closing $0 blocks", FLAGS_num_blocks_close)) {
+  LOG_TIMING(INFO, Substitute("closing $0 blocks", kNumBlocks)) {
     ASSERT_OK(this->bm_->CloseBlocks(dirty_blocks));
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.cc b/src/kudu/fs/block_manager.cc
index 7f921ae..40f1cb0 100644
--- a/src/kudu/fs/block_manager.cc
+++ b/src/kudu/fs/block_manager.cc
@@ -16,6 +16,14 @@
 // under the License.
 
 #include "kudu/fs/block_manager.h"
+
+#include <mutex>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/metrics.h"
 
@@ -28,6 +36,15 @@ DEFINE_bool(block_manager_lock_dirs, true,
             "Note that read-only concurrent usage is still allowed.");
 TAG_FLAG(block_manager_lock_dirs, unsafe);
 
+DEFINE_int64(block_manager_max_open_files, -1,
+             "Maximum number of open file descriptors to be used for data "
+             "blocks. If 0, there is no limit. If -1, Kudu will use half of "
+             "its resource limit as per getrlimit(). This is a soft limit.");
+TAG_FLAG(block_manager_max_open_files, advanced);
+TAG_FLAG(block_manager_max_open_files, evolving);
+
+using strings::Substitute;
+
 namespace kudu {
 namespace fs {
 
@@ -38,5 +55,28 @@ BlockManagerOptions::BlockManagerOptions()
 BlockManagerOptions::~BlockManagerOptions() {
 }
 
+int64_t GetFileCacheCapacityForBlockManager(Env* env) {
+  // Maximize this process' open file limit first, if possible.
+  static std::once_flag once;
+  std::call_once(once, [&]() {
+    env->IncreaseOpenFileLimit();
+  });
+
+  // See block_manager_max_open_files.
+  if (FLAGS_block_manager_max_open_files == -1) {
+    return env->GetOpenFileLimit() / 2;
+  }
+  if (FLAGS_block_manager_max_open_files == 0) {
+    return kint64max;
+  }
+  int64_t file_limit = env->GetOpenFileLimit();
+  LOG_IF(FATAL, FLAGS_block_manager_max_open_files > file_limit) <<
+      Substitute(
+          "Configured open file limit (block_manager_max_open_files) $0 "
+          "exceeds process fd limit (ulimit) $1",
+          FLAGS_block_manager_max_open_files, file_limit);
+  return FLAGS_block_manager_max_open_files;
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index 3f4cfe4..06b09f5 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -35,6 +35,7 @@ DECLARE_bool(block_coalesce_close);
 
 namespace kudu {
 
+class Env;
 class MemTracker;
 class MetricEntity;
 class Slice;
@@ -207,13 +208,19 @@ class BlockManager {
   //
   // Does not modify 'block' on error.
   virtual Status CreateBlock(const CreateBlockOptions& opts,
-                                      gscoped_ptr<WritableBlock>* block) = 0;
+                             gscoped_ptr<WritableBlock>* block) = 0;
 
   // Like the above but uses default options.
   virtual Status CreateBlock(gscoped_ptr<WritableBlock>* block) = 0;
 
   // Opens an existing block for reading.
   //
+  // While it is safe to delete a block that has already been opened, it is
+  // not safe to do so concurrently with the OpenBlock() call itself. In some
+  // block manager implementations this may result in unusual behavior. For
+  // example, OpenBlock() may succeed but subsequent ReadableBlock operations
+  // may fail.
+  //
   // Does not modify 'block' on error.
   virtual Status OpenBlock(const BlockId& block_id,
                            gscoped_ptr<ReadableBlock>* block) = 0;
@@ -271,6 +278,10 @@ class ScopedWritableBlockCloser {
   std::vector<WritableBlock*> blocks_;
 };
 
+// Compute an upper bound for a file cache embedded within a block manager
+// using resource limits obtained from the system.
+int64_t GetFileCacheCapacityForBlockManager(Env* env);
+
 } // namespace fs
 } // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 5c60329..6d2003d 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/fs/file_block_manager.h"
 
+#include <memory>
 #include <string>
 #include <vector>
 
@@ -26,6 +27,7 @@
 #include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
@@ -508,6 +510,15 @@ FileBlockManager::FileBlockManager(Env* env, const BlockManagerOptions& opts)
     mem_tracker_(MemTracker::CreateTracker(-1,
                                            "file_block_manager",
                                            opts.parent_mem_tracker)) {
+
+  int64_t file_cache_capacity = GetFileCacheCapacityForBlockManager(env_);
+  if (file_cache_capacity != kint64max) {
+    file_cache_.reset(new FileCache<RandomAccessFile>("fbm",
+                                                      env_,
+                                                      file_cache_capacity,
+                                                      opts.metric_entity));
+  }
+
   if (opts.metric_entity) {
     metrics_.reset(new internal::BlockManagerMetrics(opts.metric_entity));
   }
@@ -531,7 +542,12 @@ Status FileBlockManager::Open() {
   } else {
     mode = DataDirManager::LockMode::MANDATORY;
   }
-  return dd_manager_.Open(kMaxPaths, mode);
+  RETURN_NOT_OK(dd_manager_.Open(kMaxPaths, mode));
+
+  if (file_cache_) {
+    RETURN_NOT_OK(file_cache_->Init());
+  }
+  return Status::OK();
 }
 
 Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
@@ -606,7 +622,11 @@ Status FileBlockManager::OpenBlock(const BlockId& block_id,
   VLOG(1) << "Opening block with id " << block_id.ToString() << " at " << path;
 
   shared_ptr<RandomAccessFile> reader;
-  RETURN_NOT_OK(env_util::OpenFileForRandom(env_, path, &reader));
+  if (file_cache_) {
+    RETURN_NOT_OK(file_cache_->OpenExistingFile(path, &reader));
+  } else {
+    RETURN_NOT_OK(env_util::OpenFileForRandom(env_, path, &reader));
+  }
   block->reset(new internal::FileReadableBlock(this, block_id, reader));
   return Status::OK();
 }
@@ -619,7 +639,11 @@ Status FileBlockManager::DeleteBlock(const BlockId& block_id) {
     return Status::NotFound(
         Substitute("Block $0 not found", block_id.ToString()));
   }
-  RETURN_NOT_OK(env_->DeleteFile(path));
+  if (file_cache_) {
+    RETURN_NOT_OK(file_cache_->DeleteFile(path));
+  } else {
+    RETURN_NOT_OK(env_->DeleteFile(path));
+  }
 
   // We don't bother fsyncing the parent directory as there's nothing to be
   // gained by ensuring that the deletion is made durable. Even if we did

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/file_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 223f6db..23806e1 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -33,8 +33,11 @@
 namespace kudu {
 
 class Env;
+template <class FileType>
+class FileCache;
 class MemTracker;
 class MetricEntity;
+class RandomAccessFile;
 class WritableFile;
 
 namespace fs {
@@ -115,6 +118,9 @@ class FileBlockManager : public BlockManager {
   // Manages and owns all of the block manager's data directories.
   DataDirManager dd_manager_;
 
+  // Manages files opened for reading.
+  std::unique_ptr<FileCache<RandomAccessFile>> file_cache_;
+
   // For generating block IDs.
   ThreadSafeRandom rand_;
   AtomicInt<int64_t> next_block_id_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 52fd9c5..7877cc4 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -36,6 +36,7 @@
 #include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/malloc.h"
@@ -102,8 +103,8 @@ using internal::LogBlock;
 using internal::LogBlockContainer;
 using pb_util::ReadablePBContainerFile;
 using pb_util::WritablePBContainerFile;
+using std::shared_ptr;
 using std::string;
-using std::unordered_set;
 using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
@@ -278,7 +279,7 @@ class LogBlockContainer {
  private:
   LogBlockContainer(LogBlockManager* block_manager, DataDir* data_dir,
                     unique_ptr<WritablePBContainerFile> metadata_file,
-                    unique_ptr<RWFile> data_file);
+                    shared_ptr<RWFile> data_file);
 
   // Performs sanity checks on a block record.
   void CheckBlockRecord(const BlockRecordPB& record,
@@ -301,7 +302,7 @@ class LogBlockContainer {
 
   // Opened file handles to the container's files.
   unique_ptr<WritablePBContainerFile> metadata_file_;
-  unique_ptr<RWFile> data_file_;
+  shared_ptr<RWFile> data_file_;
 
   // The amount of data written thus far in the container.
   int64_t total_bytes_written_ = 0;
@@ -317,7 +318,7 @@ LogBlockContainer::LogBlockContainer(
     LogBlockManager* block_manager,
     DataDir* data_dir,
     unique_ptr<WritablePBContainerFile> metadata_file,
-    unique_ptr<RWFile> data_file)
+    shared_ptr<RWFile> data_file)
     : block_manager_(block_manager),
       data_dir_(data_dir),
       metadata_file_(std::move(metadata_file)),
@@ -363,13 +364,31 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
   } while (PREDICT_FALSE(metadata_status.IsAlreadyPresent() ||
                          data_status.IsAlreadyPresent()));
   if (metadata_status.ok() && data_status.ok()) {
-    unique_ptr<WritablePBContainerFile> metadata_file(
-        new WritablePBContainerFile(std::move(metadata_writer)));
+    unique_ptr<WritablePBContainerFile> metadata_file;
+    shared_ptr<RWFile> cached_data_file;
+
+    if (block_manager->file_cache_) {
+      metadata_writer.reset();
+      shared_ptr<RWFile> cached_metadata_writer;
+      RETURN_NOT_OK(block_manager->file_cache_->OpenExistingFile(
+          metadata_path, &cached_metadata_writer));
+      metadata_file.reset(new WritablePBContainerFile(
+          std::move(cached_metadata_writer)));
+
+      data_file.reset();
+      RETURN_NOT_OK(block_manager->file_cache_->OpenExistingFile(
+          data_path, &cached_data_file));
+    } else {
+      metadata_file.reset(new WritablePBContainerFile(
+          std::move(metadata_writer)));
+      cached_data_file = std::move(data_file);
+    }
     RETURN_NOT_OK(metadata_file->Init(BlockRecordPB()));
+
     container->reset(new LogBlockContainer(block_manager,
                                            dir,
                                            std::move(metadata_file),
-                                           std::move(data_file)));
+                                           std::move(cached_data_file)));
     VLOG(1) << "Created log block container " << (*container)->ToString();
   }
 
@@ -424,23 +443,37 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   }
 
   // Open the existing metadata and data files for writing.
-  unique_ptr<RWFile> metadata_writer;
-  RWFileOptions wr_opts;
-  wr_opts.mode = Env::OPEN_EXISTING;
-
-  RETURN_NOT_OK(env->NewRWFile(wr_opts,
-                               metadata_path,
-                               &metadata_writer));
-  unique_ptr<WritablePBContainerFile> metadata_pb_writer(
-      new WritablePBContainerFile(std::move(metadata_writer)));
+  unique_ptr<WritablePBContainerFile> metadata_pb_writer;
+  shared_ptr<RWFile> data_file;
+
+  if (block_manager->file_cache_) {
+    shared_ptr<RWFile> metadata_writer;
+    RETURN_NOT_OK(block_manager->file_cache_->OpenExistingFile(
+        metadata_path, &metadata_writer));
+    metadata_pb_writer.reset(new WritablePBContainerFile(
+        std::move(metadata_writer)));
+
+    RETURN_NOT_OK(block_manager->file_cache_->OpenExistingFile(
+        data_path, &data_file));
+  } else {
+    RWFileOptions wr_opts;
+    wr_opts.mode = Env::OPEN_EXISTING;
+
+    unique_ptr<RWFile> metadata_writer;
+    RETURN_NOT_OK(block_manager->env_->NewRWFile(wr_opts,
+                                                 metadata_path,
+                                                 &metadata_writer));
+    metadata_pb_writer.reset(new WritablePBContainerFile(
+        std::move(metadata_writer)));
+
+    unique_ptr<RWFile> uw;
+    RETURN_NOT_OK(block_manager->env_->NewRWFile(wr_opts,
+                                                 data_path,
+                                                 &uw));
+    data_file = std::move(uw);
+  }
   RETURN_NOT_OK(metadata_pb_writer->Reopen());
 
-  unique_ptr<RWFile> data_file;
-  RWFileOptions rw_opts;
-  rw_opts.mode = Env::OPEN_EXISTING;
-  RETURN_NOT_OK(env->NewRWFile(rw_opts,
-                               data_path,
-                               &data_file));
   uint64_t data_file_size;
   RETURN_NOT_OK(data_file->Size(&data_file_size));
 
@@ -1140,6 +1173,14 @@ LogBlockManager::LogBlockManager(Env* env, const BlockManagerOptions& opts)
     read_only_(opts.read_only),
     next_block_id_(1) {
 
+  int64_t file_cache_capacity = GetFileCacheCapacityForBlockManager(env_);
+  if (file_cache_capacity != kint64max) {
+    file_cache_.reset(new FileCache<RWFile>("lbm",
+                                            env_,
+                                            file_cache_capacity,
+                                            opts.metric_entity));
+  }
+
   // HACK: when running in a test environment, we often instantiate many
   // LogBlockManagers in the same process, eg corresponding to different
   // tablet servers in a minicluster, or due to running many separate test
@@ -1194,6 +1235,10 @@ Status LogBlockManager::Open() {
   }
   RETURN_NOT_OK(dd_manager_.Open(kuint32max, mode));
 
+  if (file_cache_) {
+    RETURN_NOT_OK(file_cache_->Init());
+  }
+
   vector<Status> statuses(dd_manager_.data_dirs().size());
   int i = 0;
   for (const auto& dd : dd_manager_.data_dirs()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 2700610..817d8af 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -19,7 +19,6 @@
 #define KUDU_FS_LOG_BLOCK_MANAGER_H
 
 #include <deque>
-#include <gtest/gtest_prod.h>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -27,6 +26,8 @@
 #include <utility>
 #include <vector>
 
+#include <gtest/gtest_prod.h>
+
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
@@ -40,7 +41,10 @@
 
 namespace kudu {
 class Env;
+template <class FileType>
+class FileCache;
 class MetricEntity;
+class RWFile;
 class ThreadPool;
 
 namespace fs {
@@ -277,6 +281,9 @@ class LogBlockManager : public BlockManager {
   // Manages and owns all of the block manager's data directories.
   DataDirManager dd_manager_;
 
+  // Manages files opened for reading.
+  std::unique_ptr<FileCache<RWFile>> file_cache_;
+
   // Maps block IDs to blocks that are now readable, either because they
   // already existed on disk when the block manager was opened, or because
   // they're WritableBlocks that were closed.

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 847d2a2..ea9d266 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -176,21 +176,13 @@ TEST_P(TestRpc, TestWrongService) {
                       "not registered on TestServer");
 }
 
-namespace {
-int GetOpenFileLimit() {
-  struct rlimit limit;
-  PCHECK(getrlimit(RLIMIT_NOFILE, &limit) == 0);
-  return limit.rlim_cur;
-}
-} // anonymous namespace
-
 // Test that we can still make RPC connections even if many fds are in use.
 // This is a regression test for KUDU-650.
 TEST_P(TestRpc, TestHighFDs) {
   // This test can only run if ulimit is set high.
   const int kNumFakeFiles = 3500;
   const int kMinUlimit = kNumFakeFiles + 100;
-  if (GetOpenFileLimit() < kMinUlimit) {
+  if (env_->GetOpenFileLimit() < kMinUlimit) {
     LOG(INFO) << "Test skipped: must increase ulimit -n to at least " << kMinUlimit;
     return;
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/util/env.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index adc2508..2748413 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -287,6 +287,15 @@ class Env {
   // Get the total amount of RAM installed on this machine.
   virtual Status GetTotalRAMBytes(int64_t* ram) = 0;
 
+  // Get the max number of file descriptors that this process can open.
+  virtual int64_t GetOpenFileLimit() = 0;
+
+  // Increase the max number of file descriptors that this process can open as
+  // much as possible. On UNIX platforms, this means increasing the
+  // RLIMIT_NOFILE resource soft limit (the limit actually enforced by the
+  // kernel) to be equal to the hard limit.
+  virtual void IncreaseOpenFileLimit() = 0;
+
  private:
   // No copying allowed
   Env(const Env&);

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 33bfcac..458cdd0 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -10,6 +10,7 @@
 #include <limits.h>
 #include <pthread.h>
 #include <sys/mman.h>
+#include <sys/resource.h>
 #include <sys/stat.h>
 #include <sys/statvfs.h>
 #include <sys/time.h>
@@ -1222,6 +1223,31 @@ class PosixEnv : public Env {
     return Status::OK();
   }
 
+  virtual int64_t GetOpenFileLimit() OVERRIDE {
+    // There's no reason for this to ever fail.
+    struct rlimit l;
+    PCHECK(getrlimit(RLIMIT_NOFILE, &l) == 0);
+    return l.rlim_cur;
+  }
+
+  virtual void IncreaseOpenFileLimit() OVERRIDE {
+    // There's no reason for this to ever fail; any process should have
+    // sufficient privilege to increase its soft limit up to the hard limit.
+    //
+    // This change is logged because it is process-wide.
+    struct rlimit l;
+    PCHECK(getrlimit(RLIMIT_NOFILE, &l) == 0);
+    if (l.rlim_cur < l.rlim_max) {
+      LOG(INFO) << Substitute("Raising process file limit from $0 to $1",
+                              l.rlim_cur, l.rlim_max);
+      l.rlim_cur = l.rlim_max;
+      PCHECK(setrlimit(RLIMIT_NOFILE, &l) == 0);
+    } else {
+      LOG(INFO) << Substitute("Not raising process file limit of $0; it is "
+          "already as high as it can go", l.rlim_cur);
+    }
+  }
+
  private:
   // unique_ptr Deleter implementation for fts_close
   struct FtsCloser {

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/util/pb_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc
index d4eba50..44c7534 100644
--- a/src/kudu/util/pb_util.cc
+++ b/src/kudu/util/pb_util.cc
@@ -541,7 +541,7 @@ void TruncateFields(Message* message, int max_len) {
   }
 }
 
-WritablePBContainerFile::WritablePBContainerFile(unique_ptr<RWFile> writer)
+WritablePBContainerFile::WritablePBContainerFile(shared_ptr<RWFile> writer)
   : state_(FileState::NOT_INITIALIZED),
     offset_(0),
     version_(kPBContainerDefaultVersion),
@@ -756,7 +756,7 @@ void WritablePBContainerFile::PopulateDescriptorSet(
   all_descs.Swap(output);
 }
 
-ReadablePBContainerFile::ReadablePBContainerFile(unique_ptr<RandomAccessFile> reader)
+ReadablePBContainerFile::ReadablePBContainerFile(shared_ptr<RandomAccessFile> reader)
   : state_(FileState::NOT_INITIALIZED),
     version_(kPBContainerInvalidVersion),
     offset_(0),

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/util/pb_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.h b/src/kudu/util/pb_util.h
index 4c2a084..eef5d27 100644
--- a/src/kudu/util/pb_util.h
+++ b/src/kudu/util/pb_util.h
@@ -252,7 +252,7 @@ class WritablePBContainerFile {
  public:
 
   // Initializes the class instance; writer must be open.
-  explicit WritablePBContainerFile(std::unique_ptr<RWFile> writer);
+  explicit WritablePBContainerFile(std::shared_ptr<RWFile> writer);
 
   // Closes the container if not already closed.
   ~WritablePBContainerFile();
@@ -347,7 +347,7 @@ class WritablePBContainerFile {
   int version_;
 
   // File writer.
-  std::unique_ptr<RWFile> writer_;
+  std::shared_ptr<RWFile> writer_;
 };
 
 // Protobuf container file opened for reading.
@@ -358,7 +358,7 @@ class ReadablePBContainerFile {
  public:
 
   // Initializes the class instance; reader must be open.
-  explicit ReadablePBContainerFile(std::unique_ptr<RandomAccessFile> reader);
+  explicit ReadablePBContainerFile(std::shared_ptr<RandomAccessFile> reader);
 
   // Closes the file if not already closed.
   ~ReadablePBContainerFile();
@@ -421,7 +421,7 @@ class ReadablePBContainerFile {
   // Wrapped in a unique_ptr so that clients need not include PB headers.
   std::unique_ptr<google::protobuf::FileDescriptorSet> protos_;
 
-  std::unique_ptr<RandomAccessFile> reader_;
+  std::shared_ptr<RandomAccessFile> reader_;
 };
 
 // Convenience functions for protobuf containers holding just one record.