You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/12/07 21:47:06 UTC

[1/2] kudu git commit: util: add file cache

Repository: kudu
Updated Branches:
  refs/heads/master a70c90500 -> 3fa9db761


util: add file cache

This commit introduces a new kind of cache: the file cache. Its purpose is
to enforce an upper bound on the process' open file descriptors with as
little overhead as possible.

During implementation, I followed these design principles:
1. Reuse the existing LRU cache implementation in cache.{cc,h} so that the
   file cache can take advantage of any future improvements there.
2. Preserve existing encapsulation as much as possible. For example, reuse
   the various open file interfaces defined in util/env.h instead of blowing
   them wide open to get at the underlying fds.
3. Even though the primary use case for the file cache is the log block
   manager (to allow for many small containers on el6), the implementation
   should be generic enough that it can be used in any block manager, or
   anywhere else in a Kudu server process.

In particular, I went back and forth between templated and non-templated
approaches. I settled on using templates once Dan helped me through some of
the thornier issues. This approach drops the capability of opening the same
file multiple "ways" in exchange for far less complexity around deferred
deletion. Going down the template route also means the file cache probably
can't be used outside the block managers.

For more design and implementation notes, see the big block comment in
util/file_cache.h.

Other goodies in this commit:
- A correctness test for the file cache.
- A multi-threaded stress test for the file cache.
- A "checker" (in the style of PeriodicWebUIChecker) for verifying that the
  number of open file handles is below what we expect it to be.
- A Cache::Handle deleter, for use with std::unique_ptr.
- A gflag for overriding the number of shards used in ShardedLRUCache, used
  to simplify capacity-based tests.

Change-Id: I26d02f71b0a9644de0b669875941adae5f426345
Reviewed-on: http://gerrit.cloudera.org:8080/5146
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6a124f04
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6a124f04
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6a124f04

Branch: refs/heads/master
Commit: 6a124f040c8ee2797b3a45c88f4a2a56176ca339
Parents: a70c905
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Nov 4 19:06:59 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Dec 7 21:15:01 2016 +0000

----------------------------------------------------------------------
 src/kudu/util/CMakeLists.txt            |   3 +
 src/kudu/util/cache.cc                  |  15 +-
 src/kudu/util/cache.h                   |  39 +-
 src/kudu/util/file_cache-stress-test.cc | 396 ++++++++++++++++++
 src/kudu/util/file_cache-test-util.h    |  84 ++++
 src/kudu/util/file_cache-test.cc        | 291 +++++++++++++
 src/kudu/util/file_cache.cc             | 587 +++++++++++++++++++++++++++
 src/kudu/util/file_cache.h              | 186 +++++++++
 src/kudu/util/test_util.cc              |  28 ++
 src/kudu/util/test_util.h               |   3 +
 10 files changed, 1628 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6a124f04/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index a89cc0b..25bac24 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -127,6 +127,7 @@ set(UTIL_SRCS
   faststring.cc
   failure_detector.cc
   fault_injection.cc
+  file_cache.cc
   flags.cc
   flag_tags.cc
   group_varint.cc
@@ -302,6 +303,8 @@ ADD_KUDU_TEST(env-test LABELS no_tsan)
 ADD_KUDU_TEST(env_util-test)
 ADD_KUDU_TEST(errno-test)
 ADD_KUDU_TEST(failure_detector-test)
+ADD_KUDU_TEST(file_cache-test)
+ADD_KUDU_TEST(file_cache-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(flag_tags-test)
 ADD_KUDU_TEST(flags-test)
 ADD_KUDU_TEST(group_varint-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a124f04/src/kudu/util/cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/cache.cc b/src/kudu/util/cache.cc
index 9d46121..4700fb3 100644
--- a/src/kudu/util/cache.cc
+++ b/src/kudu/util/cache.cc
@@ -2,13 +2,15 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include <glog/logging.h>
+#include <cstdlib>
 #include <memory>
 #include <mutex>
-#include <stdlib.h>
 #include <string>
 #include <vector>
 
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
 #include "kudu/gutil/atomic_refcount.h"
 #include "kudu/gutil/bits.h"
 #include "kudu/gutil/hash/city.h"
@@ -19,6 +21,7 @@
 #include "kudu/util/atomic.h"
 #include "kudu/util/cache.h"
 #include "kudu/util/cache_metrics.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
@@ -27,6 +30,11 @@
 #include "kudu/util/nvm_cache.h"
 #endif
 
+// Useful in tests that require accurate cache capacity accounting.
+DEFINE_bool(cache_force_single_shard, false,
+            "Override all cache implementations to use just one shard");
+TAG_FLAG(cache_force_single_shard, hidden);
+
 namespace kudu {
 
 class MetricEntity;
@@ -373,7 +381,8 @@ void LRUCache::Erase(const Slice& key, uint32_t hash) {
 // Determine the number of bits of the hash that should be used to determine
 // the cache shard. This, in turn, determines the number of shards.
 int DetermineShardBits() {
-  int bits = Bits::Log2Ceiling(base::NumCPUs());
+  int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
+      0 : Bits::Log2Ceiling(base::NumCPUs());
   VLOG(1) << "Will use " << (1 << bits) << " shards for LRU cache.";
   return bits;
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a124f04/src/kudu/util/cache.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/cache.h b/src/kudu/util/cache.h
index c8e0f31..af0a9ae 100644
--- a/src/kudu/util/cache.h
+++ b/src/kudu/util/cache.h
@@ -18,7 +18,8 @@
 #ifndef KUDU_UTIL_CACHE_H_
 #define KUDU_UTIL_CACHE_H_
 
-#include <stdint.h>
+#include <cstdint>
+#include <memory>
 #include <string>
 
 #include "kudu/gutil/macros.h"
@@ -59,6 +60,42 @@ class Cache {
   // Opaque handle to an entry stored in the cache.
   struct Handle { };
 
+  // Custom handle "deleter", primarily intended for use with std::unique_ptr.
+  //
+  // Sample usage:
+  //
+  //   Cache* cache = NewLRUCache(...);
+  //   ...
+  //   {
+  //     unique_ptr<Cache::Handle, Cache::HandleDeleter> h(
+  //       cache->Lookup(...), Cache::HandleDeleter(cache));
+  //     ...
+  //   } // 'h' is automatically released here
+  //
+  // Or:
+  //
+  //   Cache* cache = NewLRUCache(...);
+  //   ...
+  //   {
+  //     Cache::UniqueHandle h(cache->Lookup(...), Cache::HandleDeleter(cache));
+  //     ...
+  //   } // 'h' is automatically released here
+  //
+  class HandleDeleter {
+   public:
+    explicit HandleDeleter(Cache* c)
+        : c_(c) {
+    }
+
+    void operator()(Cache::Handle* h) const {
+      c_->Release(h);
+    }
+
+   private:
+    Cache* c_;
+  };
+  typedef std::unique_ptr<Handle, HandleDeleter> UniqueHandle;
+
   // Passing EXPECT_IN_CACHE will increment the hit/miss metrics that track the number of times
   // blocks were requested that the users were hoping to get the block from the cache, along with
   // with the basic metrics.

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a124f04/src/kudu/util/file_cache-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/file_cache-stress-test.cc b/src/kudu/util/file_cache-stress-test.cc
new file mode 100644
index 0000000..387936a
--- /dev/null
+++ b/src/kudu/util/file_cache-stress-test.cc
@@ -0,0 +1,396 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <deque>
+#include <iterator>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/file_cache-test-util.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/oid_generator.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+// Like CHECK_OK(), but dumps the contents of the cache before failing.
+//
+// The output of ToDebugString() tends to be long enough that LOG() truncates
+// it, so we must split it ourselves before logging.
+#define TEST_CHECK_OK(to_call) do {                                       \
+    const Status& _s = (to_call);                                         \
+    if (!_s.ok()) {                                                       \
+      LOG(INFO) << "Dumping cache contents";                              \
+      vector<string> lines = strings::Split(cache_.ToDebugString(), "\n", \
+                                            strings::SkipEmpty());        \
+      for (const auto& l : lines) {                                       \
+        LOG(INFO) << l;                                                   \
+      }                                                                   \
+    }                                                                     \
+    CHECK(_s.ok()) << "Bad status: " << _s.ToString();                    \
+  } while (0);
+
+DECLARE_bool(never_fsync);
+
+// This default value is friendly to many n-CPU configurations.
+DEFINE_int32(test_max_open_files, 192, "Maximum number of open files enforced "
+             "by the cache. Should be a multiple of the number of CPUs on the "
+             "system.");
+
+DEFINE_int32(test_num_producer_threads, 1, "Number of producer threads");
+DEFINE_int32(test_num_consumer_threads, 4, "Number of consumer threads");
+DEFINE_int32(test_duration_secs, 2, "Number of seconds to run the test");
+
+using std::deque;
+using std::shared_ptr;
+using std::thread;
+using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+template <class FileType>
+class FileCacheStressTest : public KuduTest {
+ public:
+  typedef unordered_map<string, unordered_map<string, int>> MetricMap;
+
+  FileCacheStressTest()
+      : cache_("test",
+               env_,
+               FLAGS_test_max_open_files,
+               scoped_refptr<MetricEntity>()),
+        rand_(SeedRandom()),
+        running_(1) {
+
+    // Increases total number of files manipulated, which is the right kind
+    // of stress for this test.
+    FLAGS_never_fsync = true;
+  }
+
+  void SetUp() override {
+    ASSERT_OK(cache_.Init());
+  }
+
+  void ProducerThread() {
+    Random rand(rand_.Next32());
+    ObjectIdGenerator oid_generator;
+    MetricMap metrics;
+
+    do {
+      // Create a new file with some (0-32k) random data in it.
+      string next_file_name = GetTestPath(oid_generator.Next());
+      {
+        unique_ptr<WritableFile> next_file;
+        CHECK_OK(env_->NewWritableFile(next_file_name, &next_file));
+        uint8_t buf[rand.Uniform((32 * 1024) - 1) + 1];
+        CHECK_OK(next_file->Append(GenerateRandomChunk(buf, sizeof(buf), &rand)));
+        CHECK_OK(next_file->Close());
+      }
+      {
+        std::lock_guard<simple_spinlock> l(lock_);
+        InsertOrDie(&available_files_, next_file_name, 0);
+      }
+      metrics[BaseName(next_file_name)]["create"] = 1;
+    } while (!running_.WaitFor(MonoDelta::FromMilliseconds(1)));
+
+    // Update the global metrics map.
+    MergeNewMetrics(std::move(metrics));
+  }
+
+  void ConsumerThread() {
+    // Each thread has its own PRNG to minimize contention on the main one.
+    Random rand(rand_.Next32());
+
+    // Active opened files in this thread.
+    deque<shared_ptr<FileType>> files;
+
+    // Metrics generated by this thread. They will be merged into the main
+    // metrics map when the thread is done.
+    MetricMap metrics;
+
+    do {
+      // Pick an action to perform. Distribution:
+      // 20% open
+      // 15% close
+      // 35% read
+      // 20% write
+      // 10% delete
+      int next_action = rand.Uniform(100);
+
+      if (next_action < 20) {
+        // Open an existing file.
+        string to_open;
+        if (!GetRandomFile(OPEN, &rand, &to_open)) {
+          continue;
+        }
+        shared_ptr<FileType> new_file;
+        TEST_CHECK_OK(cache_.OpenExistingFile(to_open, &new_file));
+        FinishedOpen(to_open);
+        metrics[BaseName(to_open)]["open"]++;
+        files.emplace_back(new_file);
+      } else if (next_action < 35) {
+        // Close a file.
+        if (files.empty()) {
+          continue;
+        }
+        shared_ptr<FileType> file = files.front();
+        files.pop_front();
+        metrics[BaseName(file->filename())]["close"]++;
+      } else if (next_action < 70) {
+        // Read a random chunk from a file.
+        TEST_CHECK_OK(ReadRandomChunk(files, &metrics, &rand));
+      } else if (next_action < 90) {
+        // Write a random chunk to a file.
+        TEST_CHECK_OK(WriteRandomChunk(files, &metrics, &rand));
+      } else if (next_action < 100) {
+        // Delete a file.
+        string to_delete;
+        if (!GetRandomFile(DELETE, &rand, &to_delete)) {
+          continue;
+        }
+        TEST_CHECK_OK(cache_.DeleteFile(to_delete));
+        metrics[BaseName(to_delete)]["delete"]++;
+      }
+    } while (!running_.WaitFor(MonoDelta::FromMilliseconds(1)));
+
+    // Update the global metrics map.
+    MergeNewMetrics(std::move(metrics));
+  }
+
+ protected:
+  void NotifyThreads() { running_.CountDown(); }
+
+  const MetricMap& metrics() const { return metrics_; }
+
+ private:
+  enum GetMode {
+    OPEN,
+    DELETE
+  };
+
+  // Retrieve a random file name to be either opened or deleted. If deleting,
+  // the file name is made inaccessible to future operations.
+  bool GetRandomFile(GetMode mode, Random* rand, string* out) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    if (available_files_.empty()) {
+      return false;
+    }
+
+    // This is linear time, but it's simpler than managing multiple data
+    // structures.
+    auto it = available_files_.begin();
+    std::advance(it, rand->Uniform(available_files_.size()));
+
+    // It's unsafe to delete a file that is still being opened.
+    if (mode == DELETE && it->second > 0) {
+      return false;
+    }
+
+    *out = it->first;
+    if (mode == OPEN) {
+      it->second++;
+    } else {
+      available_files_.erase(it);
+    }
+    return true;
+  }
+
+  // Signal that a previously in-progress open has finished, allowing the file
+  // in question to be deleted.
+  void FinishedOpen(const string& opened) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    int& openers = FindOrDie(available_files_, opened);
+    openers--;
+  }
+
+  // Reads a random chunk of data from a random file in 'files'. On success,
+  // writes to 'metrics'.
+  static Status ReadRandomChunk(const deque<shared_ptr<FileType>>& files,
+                                MetricMap* metrics,
+                                Random* rand) {
+    if (files.empty()) {
+      return Status::OK();
+    }
+    const shared_ptr<FileType>& file = files[rand->Uniform(files.size())];
+
+    uint64_t file_size;
+    RETURN_NOT_OK(file->Size(&file_size));
+    uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0;
+    size_t len = file_size > 0 ? rand->Uniform(file_size - off) : 0;
+    unique_ptr<uint8_t[]> scratch(new uint8_t[len]);
+    Slice s;
+    RETURN_NOT_OK(file->Read(off, len, &s, scratch.get()));
+
+    (*metrics)[BaseName(file->filename())]["read"]++;
+    return Status::OK();
+  }
+
+  // Writes a random chunk of data to a random file in 'files'. On success,
+  // updates 'metrics'.
+  //
+  // No-op for file implementations that don't support writing.
+  static Status WriteRandomChunk(const deque<shared_ptr<FileType>>& files,
+                                 MetricMap* metrics,
+                                 Random* rand);
+
+  static Slice GenerateRandomChunk(uint8_t* buffer, size_t max_length, Random* rand) {
+    size_t len = rand->Uniform(max_length);
+    len -= len % sizeof(uint32_t);
+    for (int i = 0; i < (len / sizeof(uint32_t)); i += sizeof(uint32_t)) {
+      reinterpret_cast<uint32_t*>(buffer)[i] = rand->Next32();
+    }
+    return Slice(buffer, len);
+  }
+
+  // Merge the metrics in 'new_metrics' into the global metric map.
+  void MergeNewMetrics(MetricMap new_metrics) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (const auto& file_action_pair : new_metrics) {
+      for (const auto& action_count_pair : file_action_pair.second) {
+        metrics_[file_action_pair.first][action_count_pair.first] += action_count_pair.second;
+      }
+    }
+  }
+
+  FileCache<FileType> cache_;
+
+  // Used to seed per-thread PRNGs.
+  ThreadSafeRandom rand_;
+
+  // Drops to zero when the test ends.
+  CountDownLatch running_;
+
+  // Protects 'available_files_' and 'metrics_'.
+  simple_spinlock lock_;
+
+  // Contains files produced by producer threads and ready for consumption by
+  // consumer threads.
+  //
+  // Each entry is a file name and the number of in-progress openers. To delete
+  // a file, there must be no openers.
+  unordered_map<string, int> available_files_;
+
+  // For each file name, tracks the count of consumer actions performed.
+  //
+  // Only updated at test end.
+  MetricMap metrics_;
+};
+
+template <>
+Status FileCacheStressTest<RWFile>::WriteRandomChunk(
+    const deque<shared_ptr<RWFile>>& files,
+    MetricMap* metrics,
+    Random* rand) {
+  if (files.empty()) {
+    return Status::OK();
+  }
+  const shared_ptr<RWFile>& file = files[rand->Uniform(files.size())];
+
+  uint64_t file_size;
+  RETURN_NOT_OK(file->Size(&file_size));
+  uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0;
+  uint8 buf[64];
+  RETURN_NOT_OK(file->Write(off, GenerateRandomChunk(buf, sizeof(buf), rand)));
+  (*metrics)[BaseName(file->filename())]["write"]++;
+  return Status::OK();
+}
+
+template <>
+Status FileCacheStressTest<RandomAccessFile>::WriteRandomChunk(
+    const deque<shared_ptr<RandomAccessFile>>& /* unused */,
+    MetricMap* /* unused */,
+    Random* /* unused */) {
+  return Status::OK();
+}
+
+typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes;
+TYPED_TEST_CASE(FileCacheStressTest, FileTypes);
+
+TYPED_TEST(FileCacheStressTest, TestStress) {
+  OverrideFlagForSlowTests("test_num_producer_threads", "2");
+  OverrideFlagForSlowTests("test_num_consumer_threads", "8");
+  OverrideFlagForSlowTests("test_duration_secs", "30");
+
+  // Start the threads.
+  PeriodicOpenFdChecker checker(
+      this->env_,
+      FLAGS_test_max_open_files +       // cache capacity
+      FLAGS_test_num_producer_threads + // files being written
+      FLAGS_test_num_consumer_threads); // files being opened
+  checker.Start();
+  vector<thread> producers;
+  for (int i = 0; i < FLAGS_test_num_producer_threads; i++) {
+    producers.emplace_back(&FileCacheStressTest<TypeParam>::ProducerThread, this);
+  }
+  vector<thread> consumers;
+  for (int i = 0; i < FLAGS_test_num_consumer_threads; i++) {
+    consumers.emplace_back(&FileCacheStressTest<TypeParam>::ConsumerThread, this);
+  }
+
+  // Let the test run.
+  SleepFor(MonoDelta::FromSeconds(FLAGS_test_duration_secs));
+
+  // Stop the threads.
+  this->NotifyThreads();
+  checker.Stop();
+  for (auto& p : producers) {
+    p.join();
+  }
+  for (auto& c : consumers) {
+    c.join();
+  }
+
+  // Log the metrics.
+  unordered_map<string, int> action_counts;
+  for (const auto& file_action_pair : this->metrics()) {
+    for (const auto& action_count_pair : file_action_pair.second) {
+      VLOG(2) << Substitute("$0: $1: $2",
+                            file_action_pair.first,
+                            action_count_pair.first,
+                            action_count_pair.second);
+      action_counts[action_count_pair.first] += action_count_pair.second;
+    }
+  }
+  for (const auto& action_count_pair : action_counts) {
+    LOG(INFO) << Substitute("$0: $1",
+                            action_count_pair.first,
+                            action_count_pair.second);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a124f04/src/kudu/util/file_cache-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/file_cache-test-util.h b/src/kudu/util/file_cache-test-util.h
new file mode 100644
index 0000000..8b0bcce
--- /dev/null
+++ b/src/kudu/util/file_cache-test-util.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <thread>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+// Periodically checks the number of open file descriptors belonging to this
+// process, crashing if it exceeds some upper bound.
+class PeriodicOpenFdChecker {
+ public:
+  PeriodicOpenFdChecker(Env* env, int upper_bound)
+    : env_(env),
+      initial_fd_count_(CountOpenFds(env)),
+      max_fd_count_(upper_bound + initial_fd_count_),
+      running_(1),
+      started_(false) {}
+
+  ~PeriodicOpenFdChecker() { Stop(); }
+
+  void Start() {
+    DCHECK(!started_);
+    running_.Reset(1);
+    check_thread_ = std::thread(&PeriodicOpenFdChecker::CheckThread, this);
+    started_ = true;
+  }
+
+  void Stop() {
+    if (started_) {
+      running_.CountDown();
+      check_thread_.join();
+      started_ = false;
+    }
+  }
+
+ private:
+  void CheckThread() {
+    LOG(INFO) << strings::Substitute("Periodic open fd checker starting "
+        "(initial: $0 max: $1)",
+        initial_fd_count_, max_fd_count_);
+    do {
+      int open_fd_count = CountOpenFds(env_);
+      KLOG_EVERY_N_SECS(INFO, 1) << strings::Substitute("Open fd count: $0/$1",
+                                                        open_fd_count,
+                                                        max_fd_count_);
+      CHECK_LE(open_fd_count, max_fd_count_);
+    } while (!running_.WaitFor(MonoDelta::FromMilliseconds(100)));
+  }
+
+  Env* env_;
+  const int initial_fd_count_;
+  const int max_fd_count_;
+
+  CountDownLatch running_;
+  std::thread check_thread_;
+  bool started_;
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a124f04/src/kudu/util/file_cache-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/file_cache-test.cc b/src/kudu/util/file_cache-test.cc
new file mode 100644
index 0000000..2347ba9
--- /dev/null
+++ b/src/kudu/util/file_cache-test.cc
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(cache_force_single_shard);
+DECLARE_int32(file_cache_expiry_period_ms);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+template <class FileType>
+class FileCacheTest : public KuduTest {
+ public:
+  FileCacheTest()
+      : rand_(SeedRandom()),
+        initial_open_fds_(CountOpenFds(env_)) {
+    // Simplify testing of the actual cache capacity.
+    FLAGS_cache_force_single_shard = true;
+
+    // Speed up tests that check the number of descriptors.
+    FLAGS_file_cache_expiry_period_ms = 1;
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(ReinitCache(1));
+  }
+
+ protected:
+  Status ReinitCache(int max_open_files) {
+    cache_.reset(new FileCache<FileType>("test",
+                                         env_,
+                                         max_open_files,
+                                         nullptr));
+    return cache_->Init();
+  }
+
+  Status WriteTestFile(const string& name, const string& data) {
+    unique_ptr<RWFile> f;
+    RETURN_NOT_OK(env_->NewRWFile(name, &f));
+    RETURN_NOT_OK(f->Write(0, data));
+    return Status::OK();
+  }
+
+  void AssertFdsAndDescriptors(int num_expected_fds,
+                               int num_expected_descriptors) {
+    ASSERT_EQ(initial_open_fds_ + num_expected_fds, CountOpenFds(env_));
+
+    // The expiry thread may take some time to run.
+    AssertEventually([&]() {
+      ASSERT_EQ(num_expected_descriptors, cache_->NumDescriptorsForTests());
+    });
+  }
+
+  Random rand_;
+  const int initial_open_fds_;
+  unique_ptr<FileCache<FileType>> cache_;
+};
+
+typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes;
+TYPED_TEST_CASE(FileCacheTest, FileTypes);
+
+TYPED_TEST(FileCacheTest, TestBasicOperations) {
+  // Open a non-existent file.
+  {
+    shared_ptr<TypeParam> f;
+    ASSERT_TRUE(this->cache_->OpenExistingFile(
+        "/does/not/exist", &f).IsNotFound());
+    NO_FATALS(this->AssertFdsAndDescriptors(0, 0));
+  }
+
+  const string kFile1 = this->GetTestPath("foo");
+  const string kFile2 = this->GetTestPath("bar");
+  const string kData1 = "test data 1";
+  const string kData2 = "test data 2";
+
+  // Create some test files.
+  ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+  ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+  NO_FATALS(this->AssertFdsAndDescriptors(0, 0));
+
+  {
+    // Open a test file. It should open an fd and create a descriptor.
+    shared_ptr<TypeParam> f1;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f1));
+    NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+
+    // Spot check the test data by comparing sizes.
+    for (int i = 0; i < 3; i++) {
+      uint64_t size;
+      ASSERT_OK(f1->Size(&size));
+      ASSERT_EQ(kData1.size(), size);
+      NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+    }
+
+    // Open the same file a second time. It should reuse the existing
+    // descriptor and not open a second fd.
+    shared_ptr<TypeParam> f2;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+    NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+    {
+      Cache::UniqueHandle uh(
+          this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE),
+          Cache::HandleDeleter(this->cache_->cache_.get()));
+      ASSERT_TRUE(uh.get());
+    }
+
+    // Open a second file. This will create a new descriptor, but evict the fd
+    // opened for the first file, so the fd count should remain constant.
+    shared_ptr<TypeParam> f3;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f3));
+    NO_FATALS(this->AssertFdsAndDescriptors(1, 2));
+    {
+      Cache::UniqueHandle uh(
+          this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE),
+          Cache::HandleDeleter(this->cache_->cache_.get()));
+      ASSERT_FALSE(uh.get());
+    }
+    {
+      Cache::UniqueHandle uh(
+          this->cache_->cache_->Lookup(kFile2, Cache::EXPECT_IN_CACHE),
+          Cache::HandleDeleter(this->cache_->cache_.get()));
+      ASSERT_TRUE(uh.get());
+    }
+  }
+
+  // The descriptors are all out of scope, but the open fds remain in the cache.
+  NO_FATALS(this->AssertFdsAndDescriptors(1, 0));
+
+  // With the cache gone, so are the cached fds.
+  this->cache_.reset();
+  ASSERT_EQ(this->initial_open_fds_, CountOpenFds(this->env_));
+}
+
+TYPED_TEST(FileCacheTest, TestDeletion) {
+  // Deleting a file that doesn't exist does nothing/
+  ASSERT_TRUE(this->cache_->DeleteFile("/does/not/exist").IsNotFound());
+
+  // Create a test file, then delete it. It will be deleted immediately.
+  const string kFile1 = this->GetTestPath("foo");
+  const string kData1 = "test data 1";
+  ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+  ASSERT_TRUE(this->env_->FileExists(kFile1));
+  ASSERT_OK(this->cache_->DeleteFile(kFile1));
+  ASSERT_FALSE(this->env_->FileExists(kFile1));
+
+  // Trying to delete it again fails.
+  ASSERT_TRUE(this->cache_->DeleteFile(kFile1).IsNotFound());
+
+  // Create another test file, open it, then delete it. The delete is not
+  // effected until the last open descriptor is closed. In between, the
+  // cache won't allow the file to be opened again.
+  const string kFile2 = this->GetTestPath("bar");
+  const string kData2 = "test data 2";
+  ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+  ASSERT_TRUE(this->env_->FileExists(kFile2));
+  {
+    shared_ptr<TypeParam> f1;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f1));
+    ASSERT_EQ(this->initial_open_fds_ + 1, CountOpenFds(this->env_));
+    ASSERT_OK(this->cache_->DeleteFile(kFile2));
+    {
+      shared_ptr<TypeParam> f2;
+      ASSERT_TRUE(this->cache_->OpenExistingFile(kFile2, &f2).IsNotFound());
+    }
+    ASSERT_TRUE(this->cache_->DeleteFile(kFile2).IsNotFound());
+    ASSERT_TRUE(this->env_->FileExists(kFile2));
+    ASSERT_EQ(this->initial_open_fds_ + 1, CountOpenFds(this->env_));
+  }
+  ASSERT_FALSE(this->env_->FileExists(kFile2));
+  ASSERT_EQ(this->initial_open_fds_, CountOpenFds(this->env_));
+
+  // Create a test file, open it, and let it go out of scope before
+  // deleting it. The deletion should evict the fd and close it, despite
+  // happening after the descriptor is gone.
+  const string kFile3 = this->GetTestPath("baz");
+  const string kData3 = "test data 3";
+  ASSERT_OK(this->WriteTestFile(kFile3, kData3));
+  {
+    shared_ptr<TypeParam> f3;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile3, &f3));
+  }
+  ASSERT_TRUE(this->env_->FileExists(kFile3));
+  ASSERT_EQ(this->initial_open_fds_ + 1, CountOpenFds(this->env_));
+  ASSERT_OK(this->cache_->DeleteFile(kFile3));
+  ASSERT_FALSE(this->env_->FileExists(kFile3));
+  ASSERT_EQ(this->initial_open_fds_, CountOpenFds(this->env_));
+}
+
+TYPED_TEST(FileCacheTest, TestHeavyReads) {
+  const int kNumFiles = 20;
+  const int kNumIterations = 100;
+  const int kCacheCapacity = 5;
+
+  ASSERT_OK(this->ReinitCache(kCacheCapacity));
+
+  // Randomly generate some data.
+  string data;
+  for (int i = 0; i < 1000; i++) {
+    data += Substitute("$0", this->rand_.Next());
+  }
+
+  // Write that data to a bunch of files and open them through the cache.
+  vector<shared_ptr<TypeParam>> opened_files;
+  for (int i = 0; i < kNumFiles; i++) {
+    string filename = this->GetTestPath(Substitute("$0", i));
+    ASSERT_OK(this->WriteTestFile(filename, data));
+    shared_ptr<TypeParam> f;
+    ASSERT_OK(this->cache_->OpenExistingFile(filename, &f));
+    opened_files.push_back(f);
+  }
+
+  // Read back the data at random through the cache.
+  unique_ptr<uint8_t[]> buf(new uint8_t[data.length()]);
+  for (int i = 0; i < kNumIterations; i++) {
+    int idx = this->rand_.Uniform(opened_files.size());
+    const auto& f = opened_files[idx];
+    uint64_t size;
+    ASSERT_OK(f->Size(&size));
+    Slice s;
+    ASSERT_OK(f->Read(0, size, &s, buf.get()));
+    ASSERT_EQ(data, s);
+    ASSERT_LE(CountOpenFds(this->env_),
+              this->initial_open_fds_ + kCacheCapacity);
+  }
+}
+
+TYPED_TEST(FileCacheTest, TestNoRecursiveDeadlock) {
+  // This test triggered a deadlock in a previous implementation, when expired
+  // weak_ptrs were removed from the descriptor map in the descriptor's
+  // destructor.
+  alarm(60);
+  auto cleanup = MakeScopedCleanup([]() {
+    alarm(0);
+  });
+
+  const string kFile = this->GetTestPath("foo");
+  ASSERT_OK(this->WriteTestFile(kFile, "test data"));
+
+  vector<std::thread> threads;
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&]() {
+      for (int i = 0; i < 10000; i++) {
+        shared_ptr<TypeParam> f;
+        CHECK_OK(this->cache_->OpenExistingFile(kFile, &f));
+      }
+    });
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a124f04/src/kudu/util/file_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc
new file mode 100644
index 0000000..d277937
--- /dev/null
+++ b/src/kudu/util/file_cache.cc
@@ -0,0 +1,587 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/once.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+DEFINE_int32(file_cache_expiry_period_ms, 60 * 1000,
+             "Period of time (in ms) between removing expired file cache descriptors");
+TAG_FLAG(file_cache_expiry_period_ms, advanced);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+template <class FileType>
+FileType* CacheValueToFileType(Slice s) {
+  return reinterpret_cast<FileType*>(*reinterpret_cast<void**>(
+      s.mutable_data()));
+}
+
+template <class FileType>
+class EvictionCallback : public Cache::EvictionCallback {
+ public:
+  EvictionCallback() {}
+
+  void EvictedEntry(Slice key, Slice value) override {
+    VLOG(2) << "Evicted fd belonging to " << key.ToString();
+    delete CacheValueToFileType<FileType>(value);
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(EvictionCallback);
+};
+
+} // anonymous namespace
+
+namespace internal {
+
+template <class FileType>
+class ScopedOpenedDescriptor;
+
+// Encapsulates common descriptor fields and methods.
+template <class FileType>
+class BaseDescriptor {
+ public:
+  BaseDescriptor(FileCache<FileType>* file_cache,
+                 const string& filename)
+      : file_cache_(file_cache),
+        file_name_(filename) {}
+
+  ~BaseDescriptor() {
+    VLOG(2) << "Out of scope descriptor with file name: " << filename();
+
+    // The (now expired) weak_ptr remains in 'descriptors_', to be removed by
+    // the next call to RunDescriptorExpiry(). Removing it here would risk a
+    // deadlock on recursive acquisition of 'lock_'.
+
+    if (deleted_) {
+      cache()->Erase(filename());
+
+      VLOG(1) << "Deleting file: " << filename();
+      WARN_NOT_OK(env()->DeleteFile(filename()), "");
+    }
+  }
+
+  // Insert a pointer to an open file object into the file cache with the
+  // filename as the cache key.
+  //
+  // Returns a handle to the inserted entry. The handle always contains an open
+  // file.
+  ScopedOpenedDescriptor<FileType> InsertIntoCache(void* file_ptr) const {
+    // The allocated charge is always one byte. This is incorrect with respect
+    // to memory tracking, but it's necessary if the cache capacity is to be
+    // equivalent to the max number of fds.
+    Cache::PendingHandle* pending = CHECK_NOTNULL(cache()->Allocate(
+        filename(), sizeof(file_ptr), 1));
+    memcpy(cache()->MutableValue(pending),
+           &file_ptr,
+           sizeof(file_ptr));
+    return ScopedOpenedDescriptor<FileType>(this, Cache::UniqueHandle(
+        cache()->Insert(pending, file_cache_->eviction_cb_.get()),
+        Cache::HandleDeleter(cache())));
+  }
+
+  // Retrieves a pointer to an open file object from the file cache with the
+  // filename as the cache key.
+  //
+  // Returns a handle to the looked up entry. The handle may or may not contain
+  // an open file, depending on whether the cache hit or missed.
+  ScopedOpenedDescriptor<FileType> LookupFromCache() const {
+    return ScopedOpenedDescriptor<FileType>(this, Cache::UniqueHandle(
+        cache()->Lookup(filename(), Cache::EXPECT_IN_CACHE),
+        Cache::HandleDeleter(cache())));
+  }
+
+  // Mark this descriptor as to-be-deleted later.
+  void MarkDeleted() {
+    DCHECK(!deleted_);
+    deleted_ = true;
+  }
+
+  Cache* cache() const { return file_cache_->cache_.get(); }
+
+  Env* env() const { return file_cache_->env_; }
+
+  const string& filename() const { return file_name_; }
+
+  bool deleted() const { return deleted_; }
+
+ private:
+  FileCache<FileType>* file_cache_;
+  const string file_name_;
+
+  bool deleted_ = false;
+
+  DISALLOW_COPY_AND_ASSIGN(BaseDescriptor);
+};
+
+// A "smart" retrieved LRU cache handle.
+//
+// The cache handle is released when this object goes out of scope, possibly
+// closing the opened file if it is no longer in the cache.
+template <class FileType>
+class ScopedOpenedDescriptor {
+ public:
+  // A not-yet-but-soon-to-be opened descriptor.
+  explicit ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc)
+      : desc_(desc),
+        handle_(nullptr, Cache::HandleDeleter(desc_->cache())) {
+  }
+
+  // An opened descriptor. Its handle may or may not contain an open file.
+  ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc,
+                         Cache::UniqueHandle handle)
+      : desc_(desc),
+        handle_(std::move(handle)) {
+  }
+
+  bool opened() const { return handle_.get(); }
+
+  FileType* file() const {
+    DCHECK(opened());
+    return CacheValueToFileType<FileType>(desc_->cache()->Value(handle_.get()));
+  }
+
+ private:
+  const BaseDescriptor<FileType>* desc_;
+  Cache::UniqueHandle handle_;
+};
+
+// Reference to an on-disk file that may or may not be opened (and thus
+// cached) in the file cache.
+//
+// This empty template is just a specification; actual descriptor classes must
+// be fully specialized.
+template <class FileType>
+class Descriptor : public FileType {
+};
+
+// A descriptor adhering to the RWFile interface (i.e. when opened, provides
+// a read-write interface to the underlying file).
+template <>
+class Descriptor<RWFile> : public RWFile {
+ public:
+  Descriptor(FileCache<RWFile>* file_cache, const string& filename)
+      : base_(file_cache, filename) {}
+
+  ~Descriptor() = default;
+
+  Status Read(uint64_t offset, size_t length,
+              Slice* result, uint8_t* scratch) const override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Read(offset, length, result, scratch);
+  }
+
+  Status Write(uint64_t offset, const Slice& data) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Write(offset, data);
+  }
+
+  Status PreAllocate(uint64_t offset, size_t length) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->PreAllocate(offset, length);
+  }
+
+  Status Truncate(uint64_t length) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Truncate(length);
+  }
+
+  Status PunchHole(uint64_t offset, size_t length) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->PunchHole(offset, length);
+  }
+
+  Status Flush(FlushMode mode, uint64_t offset, size_t length) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Flush(mode, offset, length);
+  }
+
+  Status Sync() override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Sync();
+  }
+
+  Status Close() override {
+    // Intentional no-op; actual closing is deferred to LRU cache eviction.
+    return Status::OK();
+  }
+
+  Status Size(uint64_t* size) const override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Size(size);
+  }
+
+  const string& filename() const override {
+    return base_.filename();
+  }
+
+ private:
+  friend class FileCache<RWFile>;
+
+  Status Init() {
+    return once_.Init(&Descriptor<RWFile>::InitOnce, this);
+  }
+
+  Status InitOnce() {
+    return ReopenFileIfNecessary(nullptr);
+  }
+
+  Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const {
+    ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache());
+    if (found.opened()) {
+      // The file is already open in the cache, return it.
+      if (out) {
+        *out = std::move(found);
+      }
+      return Status::OK();
+    }
+
+    // The file was evicted, reopen it.
+    //
+    // Because the file may be evicted at any time we must use 'sync_on_close'
+    // (note: sync is a no-op if the file isn't dirty).
+    RWFileOptions opts;
+    opts.sync_on_close = true;
+    opts.mode = Env::OPEN_EXISTING;
+    unique_ptr<RWFile> f;
+    RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));
+
+    // The cache will take ownership of the newly opened file.
+    ScopedOpenedDescriptor<RWFile> opened(base_.InsertIntoCache(f.release()));
+    if (out) {
+      *out = std::move(opened);
+    }
+    return Status::OK();
+  }
+
+  BaseDescriptor<RWFile> base_;
+  KuduOnceDynamic once_;
+
+  DISALLOW_COPY_AND_ASSIGN(Descriptor);
+};
+
+// A descriptor adhering to the RandomAccessFile interface (i.e. when opened,
+// provides a read-only interface to the underlying file).
+template <>
+class Descriptor<RandomAccessFile> : public RandomAccessFile {
+ public:
+  Descriptor(FileCache<RandomAccessFile>* file_cache, const string& filename)
+      : base_(file_cache, filename) {}
+
+  ~Descriptor() = default;
+
+  Status Read(uint64_t offset, size_t n,
+              Slice* result, uint8_t *scratch) const override {
+    ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Read(offset, n, result, scratch);
+  }
+
+  Status Size(uint64_t *size) const override {
+    ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Size(size);
+  }
+
+  const string& filename() const override {
+    return base_.filename();
+  }
+
+  size_t memory_footprint() const override {
+    return kudu_malloc_usable_size(this) +
+        once_.memory_footprint_excluding_this() +
+        base_.filename().capacity();
+  }
+
+ private:
+  friend class FileCache<RandomAccessFile>;
+
+  Status Init() {
+    return once_.Init(&Descriptor<RandomAccessFile>::InitOnce, this);
+  }
+
+  Status InitOnce() {
+    return ReopenFileIfNecessary(nullptr);
+  }
+
+  Status ReopenFileIfNecessary(
+      ScopedOpenedDescriptor<RandomAccessFile>* out) const {
+    ScopedOpenedDescriptor<RandomAccessFile> found(base_.LookupFromCache());
+    if (found.opened()) {
+      // The file is already open in the cache, return it.
+      if (out) {
+        *out = std::move(found);
+      }
+      return Status::OK();
+    }
+
+    // The file was evicted, reopen it.
+    unique_ptr<RandomAccessFile> f;
+    RETURN_NOT_OK(base_.env()->NewRandomAccessFile(base_.filename(), &f));
+
+    // The cache will take ownership of the newly opened file.
+    ScopedOpenedDescriptor<RandomAccessFile> opened(
+        base_.InsertIntoCache(f.release()));
+    if (out) {
+      *out = std::move(opened);
+    }
+    return Status::OK();
+  }
+
+  BaseDescriptor<RandomAccessFile> base_;
+  KuduOnceDynamic once_;
+
+  DISALLOW_COPY_AND_ASSIGN(Descriptor);
+};
+
+} // namespace internal
+
+template <class FileType>
+FileCache<FileType>::FileCache(const string& cache_name,
+                               Env* env,
+                               int max_open_files,
+                               const scoped_refptr<MetricEntity>& entity)
+    : env_(env),
+      cache_name_(cache_name),
+      eviction_cb_(new EvictionCallback<FileType>()),
+      cache_(NewLRUCache(DRAM_CACHE, max_open_files, cache_name)),
+      running_(1) {
+  if (entity) {
+    cache_->SetMetrics(entity);
+  }
+  LOG(INFO) << Substitute("Constructed file cache $0 with capacity $1",
+                          cache_name, max_open_files);
+}
+
+template <class FileType>
+FileCache<FileType>::~FileCache() {
+  running_.CountDown();
+  if (descriptor_expiry_thread_) {
+    descriptor_expiry_thread_->Join();
+  }
+}
+
+template <class FileType>
+Status FileCache<FileType>::Init() {
+  return Thread::Create("cache", Substitute("$0-evict", cache_name_),
+                        &FileCache::RunDescriptorExpiry, this,
+                        &descriptor_expiry_thread_);
+}
+
+template <class FileType>
+Status FileCache<FileType>::OpenExistingFile(const string& file_name,
+                                             shared_ptr<FileType>* file) {
+  shared_ptr<internal::Descriptor<FileType>> desc;
+  {
+    // Find an existing descriptor, or create one if none exists.
+    std::lock_guard<simple_spinlock> l(lock_);
+    RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
+    if (desc) {
+      VLOG(2) << "Found existing descriptor: " << desc->filename();
+    } else {
+      desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
+      InsertOrDie(&descriptors_, file_name, desc);
+      VLOG(2) << "Created new descriptor: " << desc->filename();
+    }
+  }
+
+  // Check that the underlying file can be opened (no-op for found
+  // descriptors). Done outside the lock.
+  RETURN_NOT_OK(desc->Init());
+  *file = desc;
+  return Status::OK();
+}
+
+template <class FileType>
+Status FileCache<FileType>::DeleteFile(const string& file_name) {
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    shared_ptr<internal::Descriptor<FileType>> desc;
+    RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
+
+    if (desc) {
+      VLOG(2) << "Marking file for deletion: " << file_name;
+      desc->base_.MarkDeleted();
+      return Status::OK();
+    }
+  }
+
+  // There is no outstanding descriptor. Delete the file now.
+  //
+  // Make sure it's been fully evicted from the cache (perhaps it was opened
+  // previously?) so that the filesystem can reclaim the file data instantly.
+  cache_->Erase(file_name);
+  return env_->DeleteFile(file_name);
+}
+
+template <class FileType>
+int FileCache<FileType>::NumDescriptorsForTests() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return descriptors_.size();
+}
+
+template <class FileType>
+string FileCache<FileType>::ToDebugString() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  string ret;
+  for (const auto& e : descriptors_) {
+    bool strong = false;
+    bool deleted = false;
+    bool opened = false;
+    shared_ptr<internal::Descriptor<FileType>> desc = e.second.lock();
+    if (desc) {
+      strong = true;
+      if (desc->base_.deleted()) {
+        deleted = true;
+      }
+      internal::ScopedOpenedDescriptor<FileType> o(
+          desc->base_.LookupFromCache());
+      if (o.opened()) {
+        opened = true;
+      }
+    }
+    if (strong) {
+      ret += Substitute("$0 (S$1$2)\n", e.first,
+                        deleted ? "D" : "", opened ? "O" : "");
+    } else {
+      ret += Substitute("$0\n", e.first);
+    }
+  }
+  return ret;
+}
+
+template <class FileType>
+Status FileCache<FileType>::FindDescriptorUnlocked(
+    const string& file_name,
+    shared_ptr<internal::Descriptor<FileType>>* file) {
+  DCHECK(lock_.is_locked());
+
+  auto it = descriptors_.find(file_name);
+  if (it != descriptors_.end()) {
+    // Found the descriptor. Has it expired?
+    shared_ptr<internal::Descriptor<FileType>> desc = it->second.lock();
+    if (desc) {
+      if (desc->base_.deleted()) {
+        return Status::NotFound("File already marked for deletion", file_name);
+      }
+
+      // Descriptor is still valid, return it.
+      if (file) {
+        *file = desc;
+      }
+      return Status::OK();
+    }
+    // Descriptor has expired; erase it and pretend we found nothing.
+    descriptors_.erase(it);
+  }
+  return Status::OK();
+}
+
+template <class FileType>
+void FileCache<FileType>::RunDescriptorExpiry() {
+  while (!running_.WaitFor(MonoDelta::FromMilliseconds(
+      FLAGS_file_cache_expiry_period_ms))) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (auto it = descriptors_.begin(); it != descriptors_.end();) {
+      if (it->second.expired()) {
+        it = descriptors_.erase(it);
+      } else {
+        it++;
+      }
+    }
+  }
+}
+
+// Explicit specialization for callers outside this compilation unit.
+template
+FileCache<RWFile>::FileCache(
+    const string& cache_name,
+    Env* env,
+    int max_open_files,
+    const scoped_refptr<MetricEntity>& entity);
+template
+FileCache<RWFile>::~FileCache();
+template
+Status FileCache<RWFile>::Init();
+template
+Status FileCache<RWFile>::OpenExistingFile(
+    const string& file_name,
+    shared_ptr<RWFile>* file);
+template
+Status FileCache<RWFile>::DeleteFile(const string& file_name);
+template
+int FileCache<RWFile>::NumDescriptorsForTests() const;
+template
+string FileCache<RWFile>::ToDebugString() const;
+
+template
+FileCache<RandomAccessFile>::FileCache(
+    const string& cache_name,
+    Env* env,
+    int max_open_files,
+    const scoped_refptr<MetricEntity>& entity);
+template
+FileCache<RandomAccessFile>::~FileCache();
+template
+Status FileCache<RandomAccessFile>::Init();
+template
+Status FileCache<RandomAccessFile>::OpenExistingFile(
+    const string& file_name,
+    shared_ptr<RandomAccessFile>* file);
+template
+Status FileCache<RandomAccessFile>::DeleteFile(const string& file_name);
+template
+int FileCache<RandomAccessFile>::NumDescriptorsForTests() const;
+template
+string FileCache<RandomAccessFile>::ToDebugString() const;
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a124f04/src/kudu/util/file_cache.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/file_cache.h b/src/kudu/util/file_cache.h
new file mode 100644
index 0000000..3ef87ad
--- /dev/null
+++ b/src/kudu/util/file_cache.h
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace internal {
+
+template <class FileType>
+class BaseDescriptor;
+
+template <class FileType>
+class Descriptor;
+
+} // namespace internal
+
+class MetricEntity;
+class Thread;
+
+// Cache of open files.
+//
+// The purpose of this cache is to enforce an upper bound on the maximum number
+// of files open at a time. Files opened through the cache may be closed at any
+// time, only to be reopened upon next use.
+//
+// The file cache can be viewed as having two logical parts: the client-facing
+// API and the LRU cache.
+//
+// Client-facing API
+// -----------------
+// The core of the client-facing API is the cache descriptor. A descriptor
+// uniquely identifies an opened file. To a client, a descriptor is just an
+// open file interface of the variety defined in util/env.h. Clients open
+// descriptors via the OpenExistingFile() cache method.
+//
+// Descriptors are shared objects; an existing descriptor is handed back to a
+// client if a file with the same name is already opened. To facilitate
+// descriptor sharing, the file cache maintains a by-file-name descriptor map.
+// The values are weak references to the descriptors so that map entries don't
+// affect the descriptor lifecycle.
+//
+// LRU cache
+// ---------
+// The lower half of the file cache is a standard LRU cache whose keys are file
+// names and whose values are pointers to opened file objects allocated on the
+// heap. Unlike the descriptor map, this cache has an upper bound on capacity,
+// and handles are evicted (and closed) according to an LRU algorithm.
+//
+// Whenever a descriptor is used by a client in file I/O, its file name is used
+// in an LRU cache lookup. If found, the underlying file is still open and the
+// file access is performed. Otherwise, the file must have been evicted and
+// closed, so it is reopened and reinserted (possibly evicting a different open
+// file) before the file access is performed.
+//
+// Other notes
+// -----------
+// In a world where files are opened and closed transparently, file deletion
+// demands special care if UNIX semantics are to be preserved. When a call to
+// DeleteFile() is made to a file with an opened descriptor, the descriptor is
+// simply "marked" as to-be-deleted-later. Only when all references to the
+// descriptor are dropped is the file actually deleted. If there is no open
+// descriptor, the file is deleted immediately.
+//
+// Every public method in the file cache is thread safe.
+template <class FileType>
+class FileCache {
+ public:
+  // Creates a new file cache.
+  //
+  // The 'cache_name' is used to disambiguate amongst other file cache
+  // instances. The cache will use 'max_open_files' as a soft upper bound on
+  // the number of files open at any given time.
+  FileCache(const std::string& cache_name,
+            Env* env,
+            int max_open_files,
+            const scoped_refptr<MetricEntity>& entity);
+
+  // Destroys the file cache.
+  ~FileCache();
+
+  // Initializes the file cache. Initialization done here may fail.
+  Status Init();
+
+  // Opens an existing file by name through the cache.
+  //
+  // The returned 'file' is actually an object called a descriptor. It adheres
+  // to a file-like interface but interfaces with the cache under the hood to
+  // reopen a file as needed during file operations.
+  //
+  // The descriptor is opened immediately to verify that the on-disk file can
+  // be opened, but may be closed later if the cache reaches its upper bound on
+  // the number of open files.
+  Status OpenExistingFile(const std::string& file_name,
+                          std::shared_ptr<FileType>* file);
+
+  // Deletes a file by name through the cache.
+  //
+  // If there is an outstanding descriptor for the file, the deletion will be
+  // deferred until the last referent is dropped. Otherwise, the file is
+  // deleted immediately.
+  Status DeleteFile(const std::string& file_name);
+
+  // Returns the number of entries in the descriptor map.
+  //
+  // Only intended for unit tests.
+  int NumDescriptorsForTests() const;
+
+  // Dumps the contents of the file cache. Intended for debugging.
+  std::string ToDebugString() const;
+
+ private:
+  friend class internal::BaseDescriptor<FileType>;
+
+  template<class FileType2>
+  FRIEND_TEST(FileCacheTest, TestBasicOperations);
+
+  // Looks up a descriptor by file name.
+  //
+  // Must be called with 'lock_' held.
+  Status FindDescriptorUnlocked(
+      const std::string& file_name,
+      std::shared_ptr<internal::Descriptor<FileType>>* file);
+
+  // Periodically removes expired descriptors from 'descriptors_'.
+  void RunDescriptorExpiry();
+
+  // Interface to the underlying filesystem.
+  Env* env_;
+
+  // Name of the cache.
+  const std::string cache_name_;
+
+  // Invoked whenever a cached file reaches zero references (i.e. it was
+  // removed from the cache and is no longer in use by any file operations).
+  std::unique_ptr<Cache::EvictionCallback> eviction_cb_;
+
+  // Underlying cache instance. Caches opened files.
+  std::unique_ptr<Cache> cache_;
+
+  // Protects the descriptor map.
+  mutable simple_spinlock lock_;
+
+  // Maps filenames to descriptors.
+  std::unordered_map<std::string,
+                     std::weak_ptr<internal::Descriptor<FileType>>> descriptors_;
+
+  // Calls RunDescriptorExpiry() in a loop until 'running_' isn't set.
+  scoped_refptr<Thread> descriptor_expiry_thread_;
+
+  // Tracks whether or not 'descriptor_expiry_thread_' should be running.
+  CountDownLatch running_;
+
+  DISALLOW_COPY_AND_ASSIGN(FileCache);
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a124f04/src/kudu/util/test_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/test_util.cc b/src/kudu/util/test_util.cc
index e143a42..4027872 100644
--- a/src/kudu/util/test_util.cc
+++ b/src/kudu/util/test_util.cc
@@ -16,8 +16,12 @@
 // under the License.
 #include "kudu/util/test_util.h"
 
+#include <string>
+#include <vector>
+
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <glog/stl_logging.h>
 #include <gtest/gtest-spi.h>
 
 #include "kudu/gutil/strings/strcat.h"
@@ -36,6 +40,7 @@ DEFINE_string(test_leave_files, "on_failure",
 DEFINE_int32(test_random_seed, 0, "Random seed to use for randomized tests");
 
 using std::string;
+using std::vector;
 using strings::Substitute;
 
 namespace kudu {
@@ -231,4 +236,27 @@ void AssertEventually(const std::function<void(void)>& f,
   }
 }
 
+int CountOpenFds(Env* env) {
+  static const char* kProcSelfFd =
+#if defined(__APPLE__)
+    "/dev/fd";
+#else
+    "/proc/self/fd";
+#endif // defined(__APPLE__)
+
+  vector<string> children;
+  CHECK_OK(env->GetChildren(kProcSelfFd, &children));
+  int num_fds = 0;
+  for (const auto& c : children) {
+    // Skip '.' and '..'.
+    if (c == "." || c == "..") {
+      continue;
+    }
+    num_fds++;
+  }
+
+  // Exclude the fd opened to iterate over kProcSelfFd.
+  return num_fds - 1;
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a124f04/src/kudu/util/test_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/test_util.h b/src/kudu/util/test_util.h
index bb48ee1..9ff9275 100644
--- a/src/kudu/util/test_util.h
+++ b/src/kudu/util/test_util.h
@@ -97,5 +97,8 @@ std::string GetTestDataDirectory();
 void AssertEventually(const std::function<void(void)>& f,
                       const MonoDelta& timeout = MonoDelta::FromSeconds(30));
 
+// Count the number of open file descriptors in use by this process.
+int CountOpenFds(Env* env);
+
 } // namespace kudu
 #endif


[2/2] kudu git commit: block manager: start using the file cache

Posted by ad...@apache.org.
block manager: start using the file cache

This commit integrates the file cache into both the file and log block
managers. The capacity of the cache is determined at runtime by inspecting
the RLIMIT_NOFILE resource limit; because the cache doesn't manage all open
files, we use a rather conservative 50% of the limit. This can be overridden
with a command line flag, which can also be used to disable file caching
altogether.

Other changes of note:
- Unlike the FBM, the LBM creates and opens container files for read/write
  in the same operation. Since that kind of behavior isn't supported by the
  file cache, we close the files after creating them, then reopen them
  through the cache. While inelegant, I don't expect this to be problematic.
- block_manager-stress-test now uses a PeriodicOpenFdChecker to make sure
  the file cache is working correctly. Some of the test behavior was tweaked
  to increase the number of blocks, and to avoid violating the semantics of
  the file cache.
- BlockManagerTest.CloseManyBlocksTest now uses 1000 blocks, which is enough
  for it to fail with the FBM sans file cache when the process resource
  limit is 1024 open files (the default on my Ubuntu 16.04 installation).
- When constructing a block manager, we first try to increase the
  RLIMIT_NOFILE soft limit for the process to be equal to the hard limit. On
  many systems there's enough of a gap between them that this can add a lot
  of cache capacity "for free".

Change-Id: Ieeefd31eca340111bc535eac1f982290e7703a88
Reviewed-on: http://gerrit.cloudera.org:8080/5147
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3fa9db76
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3fa9db76
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3fa9db76

Branch: refs/heads/master
Commit: 3fa9db761e35210a9e83b0be279bc44c95f6ccbf
Parents: 6a124f0
Author: Adar Dembo <ad...@cloudera.com>
Authored: Thu Nov 17 01:41:41 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Dec 7 21:46:29 2016 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-stress-test.cc | 159 ++++++++++++++++----------
 src/kudu/fs/block_manager-test.cc        |  35 +++---
 src/kudu/fs/block_manager.cc             |  40 +++++++
 src/kudu/fs/block_manager.h              |  13 ++-
 src/kudu/fs/file_block_manager.cc        |  30 ++++-
 src/kudu/fs/file_block_manager.h         |   6 +
 src/kudu/fs/log_block_manager.cc         |  89 ++++++++++----
 src/kudu/fs/log_block_manager.h          |   9 +-
 src/kudu/rpc/rpc-test.cc                 |  10 +-
 src/kudu/util/env.h                      |   9 ++
 src/kudu/util/env_posix.cc               |  26 +++++
 src/kudu/util/pb_util.cc                 |   4 +-
 src/kudu/util/pb_util.h                  |   8 +-
 13 files changed, 317 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index f5c6353..7d591ca 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -15,10 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
 #include <cmath>
-#include <memory>
 #include <mutex>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "kudu/fs/file_block_manager.h"
@@ -27,27 +28,33 @@
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/atomic.h"
+#include "kudu/util/file_cache-test-util.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/random.h"
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
+DECLARE_int64(block_manager_max_open_files);
+DECLARE_uint64(log_container_max_size);
+DECLARE_uint64(log_container_preallocate_bytes);
+DECLARE_bool(never_fsync);
+
 DEFINE_int32(test_duration_secs, 2, "Number of seconds to run the test");
 DEFINE_int32(num_writer_threads, 4, "Number of writer threads to run");
 DEFINE_int32(num_reader_threads, 8, "Number of reader threads to run");
 DEFINE_int32(num_deleter_threads, 1, "Number of deleter threads to run");
 DEFINE_int32(block_group_size, 8, "Number of blocks to write per block "
              "group. Must be power of 2");
-DEFINE_int32(block_group_bytes, 64 * 1024,
+DEFINE_int32(block_group_bytes, 32 * 1024,
              "Total amount of data (in bytes) to write per block group");
-DEFINE_int32(num_bytes_per_write, 64,
+DEFINE_int32(num_bytes_per_write, 32,
              "Number of bytes to write at a time");
 DEFINE_string(block_manager_paths, "", "Comma-separated list of paths to "
               "use for block storage. If empty, will use the default unit "
               "test path");
 
-using std::shared_ptr;
 using std::string;
+using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -59,7 +66,7 @@ namespace fs {
 // writing threads (default 2) that do the following in a tight loop:
 // - create a new group of blocks (default 10)
 // - write a PRNG seed into each block
-// - write a big chunk of data (default 64m) into the block group:
+// - write a big chunk of data (default 32m) into the block group:
 //   - pick the next block to write a piece to at random
 //   - write one piece at a time (default 64k) of data generated using
 //     that block's PRNG seed
@@ -70,22 +77,33 @@ namespace fs {
 // - read the block fully into memory, parsing its seed
 // - verify that the contents of the block match the PRNG output
 // deleting threads (default 1) that do the following every second:
-// - drain the block_id vector(write locked)
+// - drain one group of blocks from the block_id vector(write locked)
 // - delete all the blocks drained from the vector
-//
-// TODO: Don't delete all blocks ala "permgen".
 template <typename T>
 class BlockManagerStressTest : public KuduTest {
  public:
   BlockManagerStressTest() :
     rand_seed_(SeedRandom()),
     stop_latch_(1),
-    bm_(CreateBlockManager()),
     total_blocks_written_(0),
     total_bytes_written_(0),
     total_blocks_read_(0),
     total_bytes_read_(0),
     total_blocks_deleted_(0) {
+
+    // Increases total number of blocks manipulated, which is the right kind
+    // of stress for this test.
+    FLAGS_never_fsync = true;
+
+    // Increase the number of containers created.
+    FLAGS_log_container_max_size = 1 * 1024 * 1024;
+    FLAGS_log_container_preallocate_bytes = 1 * 1024 * 1024;
+
+    // Ensure the file cache is under stress too.
+    FLAGS_block_manager_max_open_files = 512;
+
+    // Defer block manager creation until after the above flags are set.
+    bm_.reset(CreateBlockManager());
   }
 
   virtual void SetUp() OVERRIDE {
@@ -164,6 +182,8 @@ class BlockManagerStressTest : public KuduTest {
   void ReaderThread();
   void DeleterThread();
 
+  int GetMaxFdCount() const;
+
  protected:
   // Used to generate random data. All PRNG instances are seeded with this
   // value to ensure that the test is reproducible.
@@ -173,10 +193,13 @@ class BlockManagerStressTest : public KuduTest {
   CountDownLatch stop_latch_;
 
   // Tracks blocks that have been synced and are ready to be read/deleted.
-  vector<BlockId> written_blocks_;
+  //
+  // Each entry is a block id and the number of in-progress openers. To delete
+  // a block, there must be no openers.
+  unordered_map<BlockId, int, BlockIdHash> written_blocks_;
 
   // Protects written_blocks_.
-  rw_spinlock lock_;
+  simple_spinlock lock_;
 
   // The block manager.
   gscoped_ptr<BlockManager> bm_;
@@ -197,9 +220,6 @@ class BlockManagerStressTest : public KuduTest {
 
 template <typename T>
 void BlockManagerStressTest<T>::WriterThread() {
-  string thread_name = Thread::current_thread()->name();
-  LOG(INFO) << "Thread " << thread_name << " starting";
-
   Random rand(rand_seed_);
   size_t num_blocks_written = 0;
   size_t num_bytes_written = 0;
@@ -216,7 +236,6 @@ void BlockManagerStressTest<T>::WriterThread() {
 
       const uint32_t seed = rand.Next() + 1;
       Slice seed_slice(reinterpret_cast<const uint8_t*>(&seed), sizeof(seed));
-      LOG(INFO) << "Creating block " << block->id().ToString() << " with seed " << seed;
       CHECK_OK(block->Append(seed_slice));
 
       dirty_blocks.push_back(block.release());
@@ -227,7 +246,6 @@ void BlockManagerStressTest<T>::WriterThread() {
     //
     // To emulate a real life workload, we pick the next block to write at
     // random, and write a smaller chunk of data to it.
-    LOG(INFO) << "Writing " << FLAGS_block_group_bytes << " bytes into new blocks";
     size_t total_dirty_bytes = 0;
     while (total_dirty_bytes < FLAGS_block_group_bytes) {
       // Pick the next block.
@@ -249,65 +267,65 @@ void BlockManagerStressTest<T>::WriterThread() {
     //
     // We could close them implicitly when the blocks are destructed but
     // this way we can check for errors.
-    LOG(INFO) << "Closing new blocks";
     CHECK_OK(bm_->CloseBlocks(dirty_blocks));
 
     // Publish the now sync'ed blocks to readers and deleters.
     {
-      std::lock_guard<rw_spinlock> l(lock_);
+      std::lock_guard<simple_spinlock> l(lock_);
       for (WritableBlock* block : dirty_blocks) {
-        written_blocks_.push_back(block->id());
+        InsertOrDie(&written_blocks_, block->id(), 0);
       }
     }
     num_blocks_written += dirty_blocks.size();
     num_bytes_written += total_dirty_bytes;
   }
 
-  LOG(INFO) << Substitute("Thread $0 stopping. Wrote $1 blocks ($2 bytes)",
-                          thread_name, num_blocks_written, num_bytes_written);
   total_blocks_written_.IncrementBy(num_blocks_written);
   total_bytes_written_.IncrementBy(num_bytes_written);
 }
 
 template <typename T>
 void BlockManagerStressTest<T>::ReaderThread() {
-  string thread_name = Thread::current_thread()->name();
-  LOG(INFO) << "Thread " << thread_name << " starting";
-
   Random rand(rand_seed_);
   size_t num_blocks_read = 0;
   size_t num_bytes_read = 0;
   MonoDelta tight_loop(MonoDelta::FromSeconds(0));
   while (!ShouldStop(tight_loop)) {
-    gscoped_ptr<ReadableBlock> block;
+    BlockId block_id;
     {
       // Grab a block at random.
-      shared_lock<rw_spinlock> l(lock_);
-      size_t num_blocks = written_blocks_.size();
-      if (num_blocks > 0) {
-        uint32_t next_id = rand.Uniform(num_blocks);
-        const BlockId& block_id = written_blocks_[next_id];
-        CHECK_OK(bm_->OpenBlock(block_id, &block));
+      std::lock_guard<simple_spinlock> l(lock_);
+      if (written_blocks_.empty()) {
+        continue;
       }
+
+      auto it = written_blocks_.begin();
+      std::advance(it, rand.Uniform(written_blocks_.size()));
+      block_id = it->first;
+      it->second++;
     }
-    if (!block) {
-      continue;
+
+    gscoped_ptr<ReadableBlock> block;
+    CHECK_OK(bm_->OpenBlock(block_id, &block));
+
+    // Done opening the block, make it available for deleting.
+    {
+      std::lock_guard<simple_spinlock> l(lock_);
+      int& openers = FindOrDie(written_blocks_, block_id);
+      openers--;
     }
 
     // Read it fully into memory.
-    string block_id = block->id().ToString();
     uint64_t block_size;
     CHECK_OK(block->Size(&block_size));
     Slice data;
     gscoped_ptr<uint8_t[]> scratch(new uint8_t[block_size]);
     CHECK_OK(block->Read(0, block_size, &data, scratch.get()));
-    LOG(INFO) << "Read " << block_size << " bytes from block " << block_id;
 
     // The first 4 bytes correspond to the PRNG seed.
     CHECK(data.size() >= 4);
     uint32_t seed;
     memcpy(&seed, data.data(), sizeof(uint32_t));
-    LOG(INFO) << "Read seed " << seed << " from block " << block_id;
     Random rand(seed);
 
     // Verify every subsequent number using the PRNG.
@@ -325,45 +343,66 @@ void BlockManagerStressTest<T>::ReaderThread() {
       }
     }
     CHECK_EQ(bytes_processed, data.size());
-    LOG(INFO) << "Finished reading block " << block->id().ToString();
     num_blocks_read++;
     num_bytes_read += block_size;
   }
 
-  LOG(INFO) << Substitute("Thread $0 stopping. Read $1 blocks ($2 bytes)",
-                          thread_name, num_blocks_read, num_bytes_read);
   total_blocks_read_.IncrementBy(num_blocks_read);
   total_bytes_read_.IncrementBy(num_bytes_read);
 }
 
 template <typename T>
 void BlockManagerStressTest<T>::DeleterThread() {
-  string thread_name = Thread::current_thread()->name();
-  LOG(INFO) << "Thread " << thread_name << " starting";
-
+  Random rand(rand_seed_);
   size_t num_blocks_deleted = 0;
-  MonoDelta sleep_time(MonoDelta::FromSeconds(1));
-  while (!ShouldStop(sleep_time)) {
-    // Grab all the blocks we can.
-    vector<BlockId> to_delete;
+  MonoDelta tight_loop(MonoDelta::FromSeconds(0));
+  while (!ShouldStop(tight_loop)) {
+    // Grab a block at random.
+    BlockId to_delete;
     {
-      std::lock_guard<rw_spinlock> l(lock_);
-      to_delete.swap(written_blocks_);
-    }
+      std::lock_guard<simple_spinlock> l(lock_);
+      if (written_blocks_.empty()) {
+        continue;
+      }
+
+      auto it = written_blocks_.begin();
+      std::advance(it, rand.Uniform(written_blocks_.size()));
+      if (it->second > 0) {
+        continue;
+      }
 
-    // And delete them.
-    for (const BlockId& block_id : to_delete) {
-      LOG(INFO) << "Deleting block " << block_id.ToString();
-      CHECK_OK(bm_->DeleteBlock(block_id));
+      to_delete = it->first;
+      written_blocks_.erase(it);
     }
-    num_blocks_deleted += to_delete.size();
+
+    // And delete it.
+    CHECK_OK(bm_->DeleteBlock(to_delete));
+    num_blocks_deleted++;
   }
 
-  LOG(INFO) << Substitute("Thread $0 stopping. Deleted $1 blocks",
-                          thread_name, num_blocks_deleted);
   total_blocks_deleted_.IncrementBy(num_blocks_deleted);
 }
 
+template <>
+int BlockManagerStressTest<FileBlockManager>::GetMaxFdCount() const {
+  return FLAGS_block_manager_max_open_files +
+      // Each open block exists outside the file cache.
+      (FLAGS_num_writer_threads * FLAGS_block_group_size) +
+      // Each reader thread can open a file outside the cache if its lookup
+      // misses. It'll immediately evict an existing fd, but in that brief
+      // window of time both fds may be open simultaneously.
+      FLAGS_num_reader_threads;
+}
+
+template <>
+int BlockManagerStressTest<LogBlockManager>::GetMaxFdCount() const {
+  return FLAGS_block_manager_max_open_files +
+      // If all containers are full, each open block could theoretically
+      // result in a new container, which is two files briefly outside the
+      // cache (before they are inserted and evict other cached files).
+      (FLAGS_num_writer_threads * FLAGS_block_group_size * 2);
+}
+
 // What kinds of BlockManagers are supported?
 #if defined(__linux__)
 typedef ::testing::Types<FileBlockManager, LogBlockManager> BlockManagers;
@@ -375,22 +414,22 @@ TYPED_TEST_CASE(BlockManagerStressTest, BlockManagers);
 TYPED_TEST(BlockManagerStressTest, StressTest) {
   OverrideFlagForSlowTests("test_duration_secs", "30");
   OverrideFlagForSlowTests("block_group_size", "16");
-  OverrideFlagForSlowTests("block_group_bytes",
-                           Substitute("$0", 64 * 1024 * 1024));
-  OverrideFlagForSlowTests("num_bytes_per_write",
-                           Substitute("$0", 64 * 1024));
 
   if ((FLAGS_block_group_size & (FLAGS_block_group_size - 1)) != 0) {
     LOG(FATAL) << "block_group_size " << FLAGS_block_group_size
                << " is not a power of 2";
   }
 
+  PeriodicOpenFdChecker checker(this->env_, this->GetMaxFdCount());
+
   LOG(INFO) << "Running on fresh block manager";
+  checker.Start();
   this->RunTest(FLAGS_test_duration_secs / 2);
   LOG(INFO) << "Running on populated block manager";
   this->bm_.reset(this->CreateBlockManager());
   ASSERT_OK(this->bm_->Open());
   this->RunTest(FLAGS_test_duration_secs / 2);
+  checker.Stop();
 
   LOG(INFO) << "Printing test totals";
   LOG(INFO) << "--------------------";

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 5209156..2e19afd 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -42,12 +42,6 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
-// LogBlockManager opens two files per container, and CloseManyBlocksTest
-// uses one container for each block. To simplify testing (i.e. no need to
-// raise the ulimit on open files), the default is kept low.
-DEFINE_int32(num_blocks_close, 500,
-             "Number of blocks to simultaneously close in CloseManyBlocksTest");
-
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
 
@@ -468,6 +462,8 @@ TYPED_TEST(BlockManagerTest, CloseTwiceTest) {
 }
 
 TYPED_TEST(BlockManagerTest, CloseManyBlocksTest) {
+  const int kNumBlocks = 1000;
+
   if (!AllowSlowTests()) {
     LOG(INFO) << "Not running in slow-tests mode";
     return;
@@ -479,23 +475,24 @@ TYPED_TEST(BlockManagerTest, CloseManyBlocksTest) {
   Random rand(SeedRandom());
   vector<WritableBlock*> dirty_blocks;
   ElementDeleter deleter(&dirty_blocks);
-  LOG(INFO) << "Creating " <<  FLAGS_num_blocks_close << " blocks";
-  for (int i = 0; i < FLAGS_num_blocks_close; i++) {
-    // Create a block.
-    gscoped_ptr<WritableBlock> written_block;
-    ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  LOG_TIMING(INFO, Substitute("creating $0 blocks", kNumBlocks)) {
+    for (int i = 0; i < kNumBlocks; i++) {
+      // Create a block.
+      gscoped_ptr<WritableBlock> written_block;
+      ASSERT_OK(this->bm_->CreateBlock(&written_block));
+
+      // Write 64k bytes of random data into it.
+      uint8_t data[65536];
+      for (int i = 0; i < sizeof(data); i += sizeof(uint32_t)) {
+        data[i] = rand.Next();
+      }
+      written_block->Append(Slice(data, sizeof(data)));
 
-    // Write 64k bytes of random data into it.
-    uint8_t data[65536];
-    for (int i = 0; i < sizeof(data); i += sizeof(uint32_t)) {
-      data[i] = rand.Next();
+      dirty_blocks.push_back(written_block.release());
     }
-    written_block->Append(Slice(data, sizeof(data)));
-
-    dirty_blocks.push_back(written_block.release());
   }
 
-  LOG_TIMING(INFO, Substitute("closing $0 blocks", FLAGS_num_blocks_close)) {
+  LOG_TIMING(INFO, Substitute("closing $0 blocks", kNumBlocks)) {
     ASSERT_OK(this->bm_->CloseBlocks(dirty_blocks));
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.cc b/src/kudu/fs/block_manager.cc
index 7f921ae..40f1cb0 100644
--- a/src/kudu/fs/block_manager.cc
+++ b/src/kudu/fs/block_manager.cc
@@ -16,6 +16,14 @@
 // under the License.
 
 #include "kudu/fs/block_manager.h"
+
+#include <mutex>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/metrics.h"
 
@@ -28,6 +36,15 @@ DEFINE_bool(block_manager_lock_dirs, true,
             "Note that read-only concurrent usage is still allowed.");
 TAG_FLAG(block_manager_lock_dirs, unsafe);
 
+DEFINE_int64(block_manager_max_open_files, -1,
+             "Maximum number of open file descriptors to be used for data "
+             "blocks. If 0, there is no limit. If -1, Kudu will use half of "
+             "its resource limit as per getrlimit(). This is a soft limit.");
+TAG_FLAG(block_manager_max_open_files, advanced);
+TAG_FLAG(block_manager_max_open_files, evolving);
+
+using strings::Substitute;
+
 namespace kudu {
 namespace fs {
 
@@ -38,5 +55,28 @@ BlockManagerOptions::BlockManagerOptions()
 BlockManagerOptions::~BlockManagerOptions() {
 }
 
+int64_t GetFileCacheCapacityForBlockManager(Env* env) {
+  // Maximize this process' open file limit first, if possible.
+  static std::once_flag once;
+  std::call_once(once, [&]() {
+    env->IncreaseOpenFileLimit();
+  });
+
+  // See block_manager_max_open_files.
+  if (FLAGS_block_manager_max_open_files == -1) {
+    return env->GetOpenFileLimit() / 2;
+  }
+  if (FLAGS_block_manager_max_open_files == 0) {
+    return kint64max;
+  }
+  int64_t file_limit = env->GetOpenFileLimit();
+  LOG_IF(FATAL, FLAGS_block_manager_max_open_files > file_limit) <<
+      Substitute(
+          "Configured open file limit (block_manager_max_open_files) $0 "
+          "exceeds process fd limit (ulimit) $1",
+          FLAGS_block_manager_max_open_files, file_limit);
+  return FLAGS_block_manager_max_open_files;
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index 3f4cfe4..06b09f5 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -35,6 +35,7 @@ DECLARE_bool(block_coalesce_close);
 
 namespace kudu {
 
+class Env;
 class MemTracker;
 class MetricEntity;
 class Slice;
@@ -207,13 +208,19 @@ class BlockManager {
   //
   // Does not modify 'block' on error.
   virtual Status CreateBlock(const CreateBlockOptions& opts,
-                                      gscoped_ptr<WritableBlock>* block) = 0;
+                             gscoped_ptr<WritableBlock>* block) = 0;
 
   // Like the above but uses default options.
   virtual Status CreateBlock(gscoped_ptr<WritableBlock>* block) = 0;
 
   // Opens an existing block for reading.
   //
+  // While it is safe to delete a block that has already been opened, it is
+  // not safe to do so concurrently with the OpenBlock() call itself. In some
+  // block manager implementations this may result in unusual behavior. For
+  // example, OpenBlock() may succeed but subsequent ReadableBlock operations
+  // may fail.
+  //
   // Does not modify 'block' on error.
   virtual Status OpenBlock(const BlockId& block_id,
                            gscoped_ptr<ReadableBlock>* block) = 0;
@@ -271,6 +278,10 @@ class ScopedWritableBlockCloser {
   std::vector<WritableBlock*> blocks_;
 };
 
+// Compute an upper bound for a file cache embedded within a block manager
+// using resource limits obtained from the system.
+int64_t GetFileCacheCapacityForBlockManager(Env* env);
+
 } // namespace fs
 } // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 5c60329..6d2003d 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/fs/file_block_manager.h"
 
+#include <memory>
 #include <string>
 #include <vector>
 
@@ -26,6 +27,7 @@
 #include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
@@ -508,6 +510,15 @@ FileBlockManager::FileBlockManager(Env* env, const BlockManagerOptions& opts)
     mem_tracker_(MemTracker::CreateTracker(-1,
                                            "file_block_manager",
                                            opts.parent_mem_tracker)) {
+
+  int64_t file_cache_capacity = GetFileCacheCapacityForBlockManager(env_);
+  if (file_cache_capacity != kint64max) {
+    file_cache_.reset(new FileCache<RandomAccessFile>("fbm",
+                                                      env_,
+                                                      file_cache_capacity,
+                                                      opts.metric_entity));
+  }
+
   if (opts.metric_entity) {
     metrics_.reset(new internal::BlockManagerMetrics(opts.metric_entity));
   }
@@ -531,7 +542,12 @@ Status FileBlockManager::Open() {
   } else {
     mode = DataDirManager::LockMode::MANDATORY;
   }
-  return dd_manager_.Open(kMaxPaths, mode);
+  RETURN_NOT_OK(dd_manager_.Open(kMaxPaths, mode));
+
+  if (file_cache_) {
+    RETURN_NOT_OK(file_cache_->Init());
+  }
+  return Status::OK();
 }
 
 Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
@@ -606,7 +622,11 @@ Status FileBlockManager::OpenBlock(const BlockId& block_id,
   VLOG(1) << "Opening block with id " << block_id.ToString() << " at " << path;
 
   shared_ptr<RandomAccessFile> reader;
-  RETURN_NOT_OK(env_util::OpenFileForRandom(env_, path, &reader));
+  if (file_cache_) {
+    RETURN_NOT_OK(file_cache_->OpenExistingFile(path, &reader));
+  } else {
+    RETURN_NOT_OK(env_util::OpenFileForRandom(env_, path, &reader));
+  }
   block->reset(new internal::FileReadableBlock(this, block_id, reader));
   return Status::OK();
 }
@@ -619,7 +639,11 @@ Status FileBlockManager::DeleteBlock(const BlockId& block_id) {
     return Status::NotFound(
         Substitute("Block $0 not found", block_id.ToString()));
   }
-  RETURN_NOT_OK(env_->DeleteFile(path));
+  if (file_cache_) {
+    RETURN_NOT_OK(file_cache_->DeleteFile(path));
+  } else {
+    RETURN_NOT_OK(env_->DeleteFile(path));
+  }
 
   // We don't bother fsyncing the parent directory as there's nothing to be
   // gained by ensuring that the deletion is made durable. Even if we did

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/file_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 223f6db..23806e1 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -33,8 +33,11 @@
 namespace kudu {
 
 class Env;
+template <class FileType>
+class FileCache;
 class MemTracker;
 class MetricEntity;
+class RandomAccessFile;
 class WritableFile;
 
 namespace fs {
@@ -115,6 +118,9 @@ class FileBlockManager : public BlockManager {
   // Manages and owns all of the block manager's data directories.
   DataDirManager dd_manager_;
 
+  // Manages files opened for reading.
+  std::unique_ptr<FileCache<RandomAccessFile>> file_cache_;
+
   // For generating block IDs.
   ThreadSafeRandom rand_;
   AtomicInt<int64_t> next_block_id_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 52fd9c5..7877cc4 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -36,6 +36,7 @@
 #include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/malloc.h"
@@ -102,8 +103,8 @@ using internal::LogBlock;
 using internal::LogBlockContainer;
 using pb_util::ReadablePBContainerFile;
 using pb_util::WritablePBContainerFile;
+using std::shared_ptr;
 using std::string;
-using std::unordered_set;
 using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
@@ -278,7 +279,7 @@ class LogBlockContainer {
  private:
   LogBlockContainer(LogBlockManager* block_manager, DataDir* data_dir,
                     unique_ptr<WritablePBContainerFile> metadata_file,
-                    unique_ptr<RWFile> data_file);
+                    shared_ptr<RWFile> data_file);
 
   // Performs sanity checks on a block record.
   void CheckBlockRecord(const BlockRecordPB& record,
@@ -301,7 +302,7 @@ class LogBlockContainer {
 
   // Opened file handles to the container's files.
   unique_ptr<WritablePBContainerFile> metadata_file_;
-  unique_ptr<RWFile> data_file_;
+  shared_ptr<RWFile> data_file_;
 
   // The amount of data written thus far in the container.
   int64_t total_bytes_written_ = 0;
@@ -317,7 +318,7 @@ LogBlockContainer::LogBlockContainer(
     LogBlockManager* block_manager,
     DataDir* data_dir,
     unique_ptr<WritablePBContainerFile> metadata_file,
-    unique_ptr<RWFile> data_file)
+    shared_ptr<RWFile> data_file)
     : block_manager_(block_manager),
       data_dir_(data_dir),
       metadata_file_(std::move(metadata_file)),
@@ -363,13 +364,31 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
   } while (PREDICT_FALSE(metadata_status.IsAlreadyPresent() ||
                          data_status.IsAlreadyPresent()));
   if (metadata_status.ok() && data_status.ok()) {
-    unique_ptr<WritablePBContainerFile> metadata_file(
-        new WritablePBContainerFile(std::move(metadata_writer)));
+    unique_ptr<WritablePBContainerFile> metadata_file;
+    shared_ptr<RWFile> cached_data_file;
+
+    if (block_manager->file_cache_) {
+      metadata_writer.reset();
+      shared_ptr<RWFile> cached_metadata_writer;
+      RETURN_NOT_OK(block_manager->file_cache_->OpenExistingFile(
+          metadata_path, &cached_metadata_writer));
+      metadata_file.reset(new WritablePBContainerFile(
+          std::move(cached_metadata_writer)));
+
+      data_file.reset();
+      RETURN_NOT_OK(block_manager->file_cache_->OpenExistingFile(
+          data_path, &cached_data_file));
+    } else {
+      metadata_file.reset(new WritablePBContainerFile(
+          std::move(metadata_writer)));
+      cached_data_file = std::move(data_file);
+    }
     RETURN_NOT_OK(metadata_file->Init(BlockRecordPB()));
+
     container->reset(new LogBlockContainer(block_manager,
                                            dir,
                                            std::move(metadata_file),
-                                           std::move(data_file)));
+                                           std::move(cached_data_file)));
     VLOG(1) << "Created log block container " << (*container)->ToString();
   }
 
@@ -424,23 +443,37 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   }
 
   // Open the existing metadata and data files for writing.
-  unique_ptr<RWFile> metadata_writer;
-  RWFileOptions wr_opts;
-  wr_opts.mode = Env::OPEN_EXISTING;
-
-  RETURN_NOT_OK(env->NewRWFile(wr_opts,
-                               metadata_path,
-                               &metadata_writer));
-  unique_ptr<WritablePBContainerFile> metadata_pb_writer(
-      new WritablePBContainerFile(std::move(metadata_writer)));
+  unique_ptr<WritablePBContainerFile> metadata_pb_writer;
+  shared_ptr<RWFile> data_file;
+
+  if (block_manager->file_cache_) {
+    shared_ptr<RWFile> metadata_writer;
+    RETURN_NOT_OK(block_manager->file_cache_->OpenExistingFile(
+        metadata_path, &metadata_writer));
+    metadata_pb_writer.reset(new WritablePBContainerFile(
+        std::move(metadata_writer)));
+
+    RETURN_NOT_OK(block_manager->file_cache_->OpenExistingFile(
+        data_path, &data_file));
+  } else {
+    RWFileOptions wr_opts;
+    wr_opts.mode = Env::OPEN_EXISTING;
+
+    unique_ptr<RWFile> metadata_writer;
+    RETURN_NOT_OK(block_manager->env_->NewRWFile(wr_opts,
+                                                 metadata_path,
+                                                 &metadata_writer));
+    metadata_pb_writer.reset(new WritablePBContainerFile(
+        std::move(metadata_writer)));
+
+    unique_ptr<RWFile> uw;
+    RETURN_NOT_OK(block_manager->env_->NewRWFile(wr_opts,
+                                                 data_path,
+                                                 &uw));
+    data_file = std::move(uw);
+  }
   RETURN_NOT_OK(metadata_pb_writer->Reopen());
 
-  unique_ptr<RWFile> data_file;
-  RWFileOptions rw_opts;
-  rw_opts.mode = Env::OPEN_EXISTING;
-  RETURN_NOT_OK(env->NewRWFile(rw_opts,
-                               data_path,
-                               &data_file));
   uint64_t data_file_size;
   RETURN_NOT_OK(data_file->Size(&data_file_size));
 
@@ -1140,6 +1173,14 @@ LogBlockManager::LogBlockManager(Env* env, const BlockManagerOptions& opts)
     read_only_(opts.read_only),
     next_block_id_(1) {
 
+  int64_t file_cache_capacity = GetFileCacheCapacityForBlockManager(env_);
+  if (file_cache_capacity != kint64max) {
+    file_cache_.reset(new FileCache<RWFile>("lbm",
+                                            env_,
+                                            file_cache_capacity,
+                                            opts.metric_entity));
+  }
+
   // HACK: when running in a test environment, we often instantiate many
   // LogBlockManagers in the same process, eg corresponding to different
   // tablet servers in a minicluster, or due to running many separate test
@@ -1194,6 +1235,10 @@ Status LogBlockManager::Open() {
   }
   RETURN_NOT_OK(dd_manager_.Open(kuint32max, mode));
 
+  if (file_cache_) {
+    RETURN_NOT_OK(file_cache_->Init());
+  }
+
   vector<Status> statuses(dd_manager_.data_dirs().size());
   int i = 0;
   for (const auto& dd : dd_manager_.data_dirs()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 2700610..817d8af 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -19,7 +19,6 @@
 #define KUDU_FS_LOG_BLOCK_MANAGER_H
 
 #include <deque>
-#include <gtest/gtest_prod.h>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -27,6 +26,8 @@
 #include <utility>
 #include <vector>
 
+#include <gtest/gtest_prod.h>
+
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
@@ -40,7 +41,10 @@
 
 namespace kudu {
 class Env;
+template <class FileType>
+class FileCache;
 class MetricEntity;
+class RWFile;
 class ThreadPool;
 
 namespace fs {
@@ -277,6 +281,9 @@ class LogBlockManager : public BlockManager {
   // Manages and owns all of the block manager's data directories.
   DataDirManager dd_manager_;
 
+  // Manages files opened for reading.
+  std::unique_ptr<FileCache<RWFile>> file_cache_;
+
   // Maps block IDs to blocks that are now readable, either because they
   // already existed on disk when the block manager was opened, or because
   // they're WritableBlocks that were closed.

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 847d2a2..ea9d266 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -176,21 +176,13 @@ TEST_P(TestRpc, TestWrongService) {
                       "not registered on TestServer");
 }
 
-namespace {
-int GetOpenFileLimit() {
-  struct rlimit limit;
-  PCHECK(getrlimit(RLIMIT_NOFILE, &limit) == 0);
-  return limit.rlim_cur;
-}
-} // anonymous namespace
-
 // Test that we can still make RPC connections even if many fds are in use.
 // This is a regression test for KUDU-650.
 TEST_P(TestRpc, TestHighFDs) {
   // This test can only run if ulimit is set high.
   const int kNumFakeFiles = 3500;
   const int kMinUlimit = kNumFakeFiles + 100;
-  if (GetOpenFileLimit() < kMinUlimit) {
+  if (env_->GetOpenFileLimit() < kMinUlimit) {
     LOG(INFO) << "Test skipped: must increase ulimit -n to at least " << kMinUlimit;
     return;
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/util/env.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index adc2508..2748413 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -287,6 +287,15 @@ class Env {
   // Get the total amount of RAM installed on this machine.
   virtual Status GetTotalRAMBytes(int64_t* ram) = 0;
 
+  // Get the max number of file descriptors that this process can open.
+  virtual int64_t GetOpenFileLimit() = 0;
+
+  // Increase the max number of file descriptors that this process can open as
+  // much as possible. On UNIX platforms, this means increasing the
+  // RLIMIT_NOFILE resource soft limit (the limit actually enforced by the
+  // kernel) to be equal to the hard limit.
+  virtual void IncreaseOpenFileLimit() = 0;
+
  private:
   // No copying allowed
   Env(const Env&);

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 33bfcac..458cdd0 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -10,6 +10,7 @@
 #include <limits.h>
 #include <pthread.h>
 #include <sys/mman.h>
+#include <sys/resource.h>
 #include <sys/stat.h>
 #include <sys/statvfs.h>
 #include <sys/time.h>
@@ -1222,6 +1223,31 @@ class PosixEnv : public Env {
     return Status::OK();
   }
 
+  virtual int64_t GetOpenFileLimit() OVERRIDE {
+    // There's no reason for this to ever fail.
+    struct rlimit l;
+    PCHECK(getrlimit(RLIMIT_NOFILE, &l) == 0);
+    return l.rlim_cur;
+  }
+
+  virtual void IncreaseOpenFileLimit() OVERRIDE {
+    // There's no reason for this to ever fail; any process should have
+    // sufficient privilege to increase its soft limit up to the hard limit.
+    //
+    // This change is logged because it is process-wide.
+    struct rlimit l;
+    PCHECK(getrlimit(RLIMIT_NOFILE, &l) == 0);
+    if (l.rlim_cur < l.rlim_max) {
+      LOG(INFO) << Substitute("Raising process file limit from $0 to $1",
+                              l.rlim_cur, l.rlim_max);
+      l.rlim_cur = l.rlim_max;
+      PCHECK(setrlimit(RLIMIT_NOFILE, &l) == 0);
+    } else {
+      LOG(INFO) << Substitute("Not raising process file limit of $0; it is "
+          "already as high as it can go", l.rlim_cur);
+    }
+  }
+
  private:
   // unique_ptr Deleter implementation for fts_close
   struct FtsCloser {

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/util/pb_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc
index d4eba50..44c7534 100644
--- a/src/kudu/util/pb_util.cc
+++ b/src/kudu/util/pb_util.cc
@@ -541,7 +541,7 @@ void TruncateFields(Message* message, int max_len) {
   }
 }
 
-WritablePBContainerFile::WritablePBContainerFile(unique_ptr<RWFile> writer)
+WritablePBContainerFile::WritablePBContainerFile(shared_ptr<RWFile> writer)
   : state_(FileState::NOT_INITIALIZED),
     offset_(0),
     version_(kPBContainerDefaultVersion),
@@ -756,7 +756,7 @@ void WritablePBContainerFile::PopulateDescriptorSet(
   all_descs.Swap(output);
 }
 
-ReadablePBContainerFile::ReadablePBContainerFile(unique_ptr<RandomAccessFile> reader)
+ReadablePBContainerFile::ReadablePBContainerFile(shared_ptr<RandomAccessFile> reader)
   : state_(FileState::NOT_INITIALIZED),
     version_(kPBContainerInvalidVersion),
     offset_(0),

http://git-wip-us.apache.org/repos/asf/kudu/blob/3fa9db76/src/kudu/util/pb_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.h b/src/kudu/util/pb_util.h
index 4c2a084..eef5d27 100644
--- a/src/kudu/util/pb_util.h
+++ b/src/kudu/util/pb_util.h
@@ -252,7 +252,7 @@ class WritablePBContainerFile {
  public:
 
   // Initializes the class instance; writer must be open.
-  explicit WritablePBContainerFile(std::unique_ptr<RWFile> writer);
+  explicit WritablePBContainerFile(std::shared_ptr<RWFile> writer);
 
   // Closes the container if not already closed.
   ~WritablePBContainerFile();
@@ -347,7 +347,7 @@ class WritablePBContainerFile {
   int version_;
 
   // File writer.
-  std::unique_ptr<RWFile> writer_;
+  std::shared_ptr<RWFile> writer_;
 };
 
 // Protobuf container file opened for reading.
@@ -358,7 +358,7 @@ class ReadablePBContainerFile {
  public:
 
   // Initializes the class instance; reader must be open.
-  explicit ReadablePBContainerFile(std::unique_ptr<RandomAccessFile> reader);
+  explicit ReadablePBContainerFile(std::shared_ptr<RandomAccessFile> reader);
 
   // Closes the file if not already closed.
   ~ReadablePBContainerFile();
@@ -421,7 +421,7 @@ class ReadablePBContainerFile {
   // Wrapped in a unique_ptr so that clients need not include PB headers.
   std::unique_ptr<google::protobuf::FileDescriptorSet> protos_;
 
-  std::unique_ptr<RandomAccessFile> reader_;
+  std::shared_ptr<RandomAccessFile> reader_;
 };
 
 // Convenience functions for protobuf containers holding just one record.