You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/01/22 22:25:52 UTC

[kudu] branch master updated (69cf206 -> fc4ab69)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 69cf206  [clock] auto-config of built-in NTP client in cloud
     new 21073b0  file cache: support alternate open modes
     new fc4ab69  log: start using file cache

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/consensus/consensus_peers-test.cc         |  17 +--
 src/kudu/consensus/consensus_queue-test.cc         |   1 +
 src/kudu/consensus/log-test-base.h                 |  18 ++-
 src/kudu/consensus/log-test.cc                     |  40 ++++--
 src/kudu/consensus/log.cc                          |  50 ++++++--
 src/kudu/consensus/log.h                           |   5 +-
 src/kudu/consensus/log_cache-test.cc               |   3 +-
 src/kudu/consensus/log_index-test.cc               |  10 +-
 src/kudu/consensus/log_index.cc                    |  28 ++--
 src/kudu/consensus/log_index.h                     |   6 +-
 src/kudu/consensus/log_reader.cc                   |  13 +-
 src/kudu/consensus/log_reader.h                    |  11 +-
 src/kudu/consensus/log_util.cc                     |  23 +++-
 src/kudu/consensus/log_util.h                      |   2 +
 src/kudu/consensus/mt-log-test.cc                  |  11 +-
 src/kudu/consensus/raft_consensus_quorum-test.cc   |   6 +-
 src/kudu/fs/file_block_manager.cc                  |   3 +-
 src/kudu/fs/log_block_manager.cc                   |  73 ++++++-----
 src/kudu/integration-tests/log_verifier.cc         |   9 +-
 .../timestamp_advancement-itest.cc                 |   7 +-
 src/kudu/integration-tests/ts_recovery-itest.cc    |   3 +-
 src/kudu/master/sys_catalog.cc                     |   7 +-
 src/kudu/tablet/tablet_bootstrap-test.cc           |  18 +--
 src/kudu/tablet/tablet_bootstrap.cc                |  13 +-
 src/kudu/tablet/tablet_bootstrap.h                 |   4 +-
 src/kudu/tablet/tablet_replica-test.cc             |  22 ++--
 src/kudu/tools/kudu-tool-test.cc                   |   3 +-
 src/kudu/tools/tool_action_local_replica.cc        |  14 +-
 src/kudu/tools/tool_action_perf.cc                 |   3 +-
 src/kudu/tools/tool_action_wal.cc                  |   5 +-
 .../tserver/tablet_copy_source_session-test.cc     |  11 +-
 src/kudu/tserver/ts_tablet_manager.cc              |   3 +-
 src/kudu/tserver/ts_tablet_manager.h               |   5 +-
 src/kudu/util/file_cache-stress-test.cc            |   4 +-
 src/kudu/util/file_cache-test.cc                   |  90 ++++++++++---
 src/kudu/util/file_cache.cc                        | 142 ++++++++++++++++-----
 src/kudu/util/file_cache.h                         |  49 +++++--
 37 files changed, 518 insertions(+), 214 deletions(-)


[kudu] 01/02: file cache: support alternate open modes

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 21073b06ce30171b10ed09bee133fcafe650f022
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Mon Jan 20 21:18:08 2020 -0800

    file cache: support alternate open modes
    
    Previously, the file cache only supported opening existing files; any new
    file creation happened out-of-band and then the file reopened via the cache.
    If we're going to use the file cache for log index chunks, however, we need
    to support CREATE_OR_OPEN style usage, and doing it in the log index itself
    is somewhat hairy.
    
    This patch modifies the file cache to support most of the modes defined in
    Env::OpenMode. I tried to ensure that cache operations look and feel like a
    standard POSIX filesystem, but it's tough to get this right, and I'm sure I
    missed some corner cases. I feel pretty good about our use cases (block
    managers, log segments, and log index chunks) though.
    
    Change-Id: Ie167302ef85b8e1a40fbb89a7742e2cbb43bcec3
    Reviewed-on: http://gerrit.cloudera.org:8080/15081
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/fs/file_block_manager.cc       |   3 +-
 src/kudu/fs/log_block_manager.cc        |  73 ++++++++--------
 src/kudu/util/file_cache-stress-test.cc |   4 +-
 src/kudu/util/file_cache-test.cc        |  90 ++++++++++++++++----
 src/kudu/util/file_cache.cc             | 142 ++++++++++++++++++++++++--------
 src/kudu/util/file_cache.h              |  49 ++++++++---
 6 files changed, 259 insertions(+), 102 deletions(-)

diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 0c5796c..09b2a99 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -829,7 +829,8 @@ Status FileBlockManager::OpenBlock(const BlockId& block_id,
 
   shared_ptr<RandomAccessFile> reader;
   if (PREDICT_TRUE(file_cache_)) {
-    RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_->OpenExistingFile(path, &reader));
+    RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_->OpenFile<Env::MUST_EXIST>(
+        path, &reader));
   } else {
     unique_ptr<RandomAccessFile> r;
     RETURN_NOT_OK_FBM_DISK_FAILURE(env_->NewRandomAccessFile(path, &r));
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index b09f319..cb40778 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -764,42 +764,47 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
   //
   // When looping, we delete any created-and-orphaned files.
   do {
-    unique_ptr<RWFile> rwf;
-
-    if (metadata_writer) {
-      block_manager->env()->DeleteFile(metadata_path);
-    }
     common_path = JoinPathSegments(dir->dir(),
                                    block_manager->oid_generator()->Next());
     metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
-    RWFileOptions wr_opts;
-    wr_opts.mode = Env::MUST_CREATE;
-    metadata_status = block_manager->env()->NewRWFile(wr_opts,
-                                                      metadata_path,
-                                                      &rwf);
-    metadata_writer.reset(rwf.release());
-
-    if (data_file) {
-      block_manager->env()->DeleteFile(data_path);
-    }
     data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
-    RWFileOptions rw_opts;
-    rw_opts.mode = Env::MUST_CREATE;
-    data_status = block_manager->env()->NewRWFile(rw_opts,
-                                                  data_path,
-                                                  &rwf);
-    data_file.reset(rwf.release());
+
+    if (PREDICT_TRUE(block_manager->file_cache_)) {
+      if (metadata_writer) {
+        WARN_NOT_OK(block_manager->file_cache_->DeleteFile(metadata_path),
+                    "could not delete orphaned metadata file thru file cache");
+      }
+      if (data_file) {
+        WARN_NOT_OK(block_manager->file_cache_->DeleteFile(data_path),
+                    "could not delete orphaned data file thru file cache");
+      }
+      metadata_status = block_manager->file_cache_->OpenFile<Env::MUST_CREATE>(
+          metadata_path, &metadata_writer);
+      data_status = block_manager->file_cache_->OpenFile<Env::MUST_CREATE>(
+          data_path, &data_file);
+    } else {
+      if (metadata_writer) {
+        WARN_NOT_OK(block_manager->env()->DeleteFile(metadata_path),
+                    "could not delete orphaned metadata file");
+      }
+      if (data_file) {
+        WARN_NOT_OK(block_manager->env()->DeleteFile(data_path),
+                    "could not delete orphaned data file");
+      }
+      unique_ptr<RWFile> rwf;
+      RWFileOptions rw_opts;
+
+      rw_opts.mode = Env::MUST_CREATE;
+      metadata_status = block_manager->env()->NewRWFile(
+          rw_opts, metadata_path, &rwf);
+      metadata_writer.reset(rwf.release());
+      data_status = block_manager->env()->NewRWFile(
+          rw_opts, data_path, &rwf);
+      data_file.reset(rwf.release());
+    }
   } while (PREDICT_FALSE(metadata_status.IsAlreadyPresent() ||
                          data_status.IsAlreadyPresent()));
   if (metadata_status.ok() && data_status.ok()) {
-    if (PREDICT_TRUE(block_manager->file_cache_)) {
-      metadata_writer.reset();
-      RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
-          metadata_path, &metadata_writer));
-      data_file.reset();
-      RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
-          data_path, &data_file));
-    }
     unique_ptr<WritablePBContainerFile> metadata_file(new WritablePBContainerFile(
         std::move(metadata_writer)));
     RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_file->CreateNew(BlockRecordPB()));
@@ -834,10 +839,10 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   shared_ptr<RWFile> metadata_file;
   shared_ptr<RWFile> data_file;
   if (PREDICT_TRUE(block_manager->file_cache_)) {
-    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
-        metadata_path, &metadata_file));
-    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
-        data_path, &data_file));
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(
+        block_manager->file_cache_->OpenFile<Env::MUST_EXIST>(metadata_path, &metadata_file));
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(
+        block_manager->file_cache_->OpenFile<Env::MUST_EXIST>(data_path, &data_file));
   } else {
     RWFileOptions opts;
     opts.mode = Env::MUST_EXIST;
@@ -1224,7 +1229,7 @@ Status LogBlockContainer::SyncMetadata() {
 Status LogBlockContainer::ReopenMetadataWriter() {
   shared_ptr<RWFile> f;
   if (PREDICT_TRUE(block_manager_->file_cache_)) {
-    RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenExistingFile(
+    RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenFile<Env::MUST_EXIST>(
         metadata_file_->filename(), &f));
   } else {
     unique_ptr<RWFile> f_uniq;
diff --git a/src/kudu/util/file_cache-stress-test.cc b/src/kudu/util/file_cache-stress-test.cc
index 8fa1b74..bd3fd22 100644
--- a/src/kudu/util/file_cache-stress-test.cc
+++ b/src/kudu/util/file_cache-stress-test.cc
@@ -172,13 +172,13 @@ class FileCacheStressTest : public KuduTest {
         }
         if (HasPrefixString(BaseName(to_open), kRWFPrefix)) {
           shared_ptr<RWFile> rwf;
-          TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &rwf));
+          TEST_CHECK_OK(cache_->OpenFile<Env::MUST_EXIST>(to_open, &rwf));
           rwfs.emplace_back(std::move(rwf));
         } else {
           CHECK(HasPrefixString(BaseName(to_open), kRAFPrefix));
 
           shared_ptr<RandomAccessFile> raf;
-          TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &raf));
+          TEST_CHECK_OK(cache_->OpenFile<Env::MUST_EXIST>(to_open, &raf));
           rafs.emplace_back(std::move(raf));
         }
         FinishedOpen(to_open);
diff --git a/src/kudu/util/file_cache-test.cc b/src/kudu/util/file_cache-test.cc
index 1fbfc38..01ecfd5 100644
--- a/src/kudu/util/file_cache-test.cc
+++ b/src/kudu/util/file_cache-test.cc
@@ -121,7 +121,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) {
   // Open a non-existent file.
   {
     shared_ptr<TypeParam> f;
-    ASSERT_TRUE(this->cache_->OpenExistingFile(
+    ASSERT_TRUE(this->cache_->template OpenFile<Env::MUST_EXIST>(
         "/does/not/exist", &f).IsNotFound());
     NO_FATALS(this->AssertFdsAndDescriptors(0, 0));
   }
@@ -139,7 +139,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) {
   {
     // Open a test file. It should open an fd and create a descriptor.
     shared_ptr<TypeParam> f1;
-    ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f1));
+    ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f1));
     NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
 
     // Spot check the test data by comparing sizes.
@@ -153,7 +153,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) {
     // Open the same file a second time. It should reuse the existing
     // descriptor and not open a second fd.
     shared_ptr<TypeParam> f2;
-    ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+    ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f2));
     NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
     {
       auto uh(this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE));
@@ -163,7 +163,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) {
     // Open a second file. This will create a new descriptor, but evict the fd
     // opened for the first file, so the fd count should remain constant.
     shared_ptr<TypeParam> f3;
-    ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f3));
+    ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile2, &f3));
     NO_FATALS(this->AssertFdsAndDescriptors(1, 2));
     {
       auto uh(this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE));
@@ -207,12 +207,12 @@ TYPED_TEST(FileCacheTest, TestDeletion) {
   ASSERT_TRUE(this->env_->FileExists(kFile2));
   {
     shared_ptr<TypeParam> f1;
-    ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f1));
+    ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile2, &f1));
     ASSERT_EQ(this->initial_open_fds_ + 1, this->CountOpenFds());
     ASSERT_OK(this->cache_->DeleteFile(kFile2));
     {
       shared_ptr<TypeParam> f2;
-      ASSERT_TRUE(this->cache_->OpenExistingFile(kFile2, &f2).IsNotFound());
+      ASSERT_TRUE(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile2, &f2).IsNotFound());
     }
     ASSERT_TRUE(this->cache_->DeleteFile(kFile2).IsNotFound());
     ASSERT_TRUE(this->env_->FileExists(kFile2));
@@ -229,7 +229,7 @@ TYPED_TEST(FileCacheTest, TestDeletion) {
   ASSERT_OK(this->WriteTestFile(kFile3, kData3));
   {
     shared_ptr<TypeParam> f3;
-    ASSERT_OK(this->cache_->OpenExistingFile(kFile3, &f3));
+    ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile3, &f3));
   }
   ASSERT_TRUE(this->env_->FileExists(kFile3));
   ASSERT_EQ(this->initial_open_fds_, this->CountOpenFds());
@@ -245,7 +245,7 @@ TYPED_TEST(FileCacheTest, TestInvalidation) {
 
   // Open the file.
   shared_ptr<TypeParam> f;
-  ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f));
+  ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f));
 
   // Write a new file and rename it in place on top of file1.
   const string kFile2 = this->GetTestPath("foo2");
@@ -266,7 +266,7 @@ TYPED_TEST(FileCacheTest, TestInvalidation) {
   // But if we re-open the path again, the new descriptor should read the
   // new data.
   shared_ptr<TypeParam> f2;
-  ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+  ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f2));
   ASSERT_OK(f2->Size(&size));
   ASSERT_EQ(kData2.size(), size);
 }
@@ -291,7 +291,7 @@ TYPED_TEST(FileCacheTest, TestHeavyReads) {
     string filename = this->GetTestPath(Substitute("$0", i));
     ASSERT_OK(this->WriteTestFile(filename, data));
     shared_ptr<TypeParam> f;
-    ASSERT_OK(this->cache_->OpenExistingFile(filename, &f));
+    ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(filename, &f));
     opened_files.push_back(f);
   }
 
@@ -328,7 +328,7 @@ TYPED_TEST(FileCacheTest, TestNoRecursiveDeadlock) {
     threads.emplace_back([&]() {
       for (int i = 0; i < 10000; i++) {
         shared_ptr<TypeParam> f;
-        CHECK_OK(this->cache_->OpenExistingFile(kFile, &f));
+        CHECK_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile, &f));
       }
     });
   }
@@ -346,13 +346,67 @@ TEST_F(RandomAccessFileCacheTest, TestMemoryFootprintDoesNotCrash) {
   ASSERT_OK(this->WriteTestFile(kFile, "test data"));
 
   shared_ptr<RandomAccessFile> f;
-  ASSERT_OK(cache_->OpenExistingFile(kFile, &f));
+  ASSERT_OK(cache_->OpenFile<Env::MUST_EXIST>(kFile, &f));
 
   // This used to crash due to a kudu_malloc_usable_size() call on a memory
   // address that wasn't the start of an actual heap allocation.
   LOG(INFO) << f->memory_footprint();
 }
 
+class RWFileCacheTest : public FileCacheTest<RWFile> {
+};
+
+TEST_F(RWFileCacheTest, TestOpenMustCreate) {
+  const string kFile1 = this->GetTestPath("foo");
+  const string kFile2 = this->GetTestPath("bar");
+
+  {
+    shared_ptr<RWFile> rwf1;
+    ASSERT_OK(cache_->OpenFile<Env::MUST_CREATE>(kFile1, &rwf1));
+    NO_FATALS(AssertFdsAndDescriptors(1, 1));
+
+    // If there's already a descriptor, a second open will fail in the file cache.
+    shared_ptr<RWFile> rwf2;
+    ASSERT_TRUE(cache_->OpenFile<Env::MUST_CREATE>(kFile1, &rwf2).IsAlreadyPresent());
+
+    // Now let's evict kFile1.
+    shared_ptr<RWFile> rwf3;
+    ASSERT_OK(cache_->OpenFile<Env::MUST_CREATE>(kFile2, &rwf3));
+    NO_FATALS(AssertFdsAndDescriptors(1, 2));
+
+    // The reopen of kFile1 shouldn't be with MUST_CREATE; otherwise this would fail.
+    ASSERT_OK(rwf1->Sync());
+  }
+  {
+    // Without any existing descriptors, open will fail in the filesystem.
+    NO_FATALS(AssertFdsAndDescriptors(0, 0));
+    shared_ptr<RWFile> rwf;
+    ASSERT_TRUE(cache_->OpenFile<Env::MUST_CREATE>(kFile1, &rwf).IsAlreadyPresent());
+  }
+}
+
+TEST_F(RWFileCacheTest, TestOpenCreateOrOpen) {
+  const string kFile1 = this->GetTestPath("foo");
+  const string kFile2 = this->GetTestPath("bar");
+
+  shared_ptr<RWFile> rwf1;
+  ASSERT_OK(cache_->OpenFile<Env::CREATE_OR_OPEN>(kFile1, &rwf1));
+
+  // If there's already a descriptor, a second open will also succeed.
+  shared_ptr<RWFile> rwf2;
+  ASSERT_OK(cache_->OpenFile<Env::CREATE_OR_OPEN>(kFile1, &rwf2));
+
+  // Now let's evict kFile1.
+  shared_ptr<RWFile> rwf3;
+  ASSERT_OK(cache_->OpenFile<Env::CREATE_OR_OPEN>(kFile2, &rwf3));
+  NO_FATALS(AssertFdsAndDescriptors(1, 2));
+
+  // The reopen of kFile1 should use MUST_EXIST. If we delete the file out
+  // from under the cache, we can see this in action as the reopen fails.
+  ASSERT_OK(env_->DeleteFile(kFile1));
+  ASSERT_TRUE(rwf1->Sync().IsNotFound());
+}
+
 class MixedFileCacheTest : public KuduTest {
 };
 
@@ -376,9 +430,9 @@ TEST_F(MixedFileCacheTest, TestBothFileTypes) {
 
   // Open the test files, each as a different file type.
   shared_ptr<RWFile> rwf;
-  ASSERT_OK(cache.OpenExistingFile(kFile1, &rwf));
+  ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile1, &rwf));
   shared_ptr<RandomAccessFile> raf;
-  ASSERT_OK(cache.OpenExistingFile(kFile2, &raf));
+  ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile2, &raf));
 
   // Verify the correct file contents for each test file.
   uint64_t size;
@@ -397,13 +451,13 @@ TEST_F(MixedFileCacheTest, TestBothFileTypes) {
   //
   // These checks are expensive so they're only done in DEBUG mode.
   shared_ptr<RWFile> rwf2;
-  ASSERT_OK(cache.OpenExistingFile(kFile1, &rwf2));
+  ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile1, &rwf2));
   shared_ptr<RandomAccessFile> raf2;
-  ASSERT_OK(cache.OpenExistingFile(kFile2, &raf2));
+  ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile2, &raf2));
 #ifndef NDEBUG
-  ASSERT_DEATH({ cache.OpenExistingFile(kFile1, &raf); },
+  ASSERT_DEATH({ cache.OpenFile<Env::MUST_EXIST>(kFile1, &raf); },
                "!FindDescriptorUnlocked");
-  ASSERT_DEATH({ cache.OpenExistingFile(kFile2, &rwf); },
+  ASSERT_DEATH({ cache.OpenFile<Env::MUST_EXIST>(kFile2, &rwf); },
                "!FindDescriptorUnlocked");
 #endif
 }
diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc
index 8fb0adf..0bbbfe7 100644
--- a/src/kudu/util/file_cache.cc
+++ b/src/kudu/util/file_cache.cc
@@ -240,55 +240,55 @@ class Descriptor<RWFile> : public RWFile {
 
   Status Read(uint64_t offset, Slice result) const override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->Read(offset, result);
   }
 
   Status ReadV(uint64_t offset, ArrayView<Slice> results) const override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->ReadV(offset, results);
   }
 
   Status Write(uint64_t offset, const Slice& data) override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->Write(offset, data);
   }
 
   Status WriteV(uint64_t offset, ArrayView<const Slice> data) override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->WriteV(offset, data);
   }
 
   Status PreAllocate(uint64_t offset, size_t length, PreAllocateMode mode) override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->PreAllocate(offset, length, mode);
   }
 
   Status Truncate(uint64_t length) override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->Truncate(length);
   }
 
   Status PunchHole(uint64_t offset, size_t length) override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->PunchHole(offset, length);
   }
 
   Status Flush(FlushMode mode, uint64_t offset, size_t length) override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->Flush(mode, offset, length);
   }
 
   Status Sync() override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->Sync();
   }
 
@@ -299,13 +299,13 @@ class Descriptor<RWFile> : public RWFile {
 
   Status Size(uint64_t* size) const override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->Size(size);
   }
 
   Status GetExtentMap(ExtentMap* out) const override {
     ScopedOpenedDescriptor<RWFile> opened(&base_);
-    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
     return opened.file()->GetExtentMap(out);
   }
 
@@ -316,14 +316,17 @@ class Descriptor<RWFile> : public RWFile {
  private:
   friend class ::kudu::FileCache;
 
+  template <Env::OpenMode Mode>
   Status Init() {
-    return once_.Init(&Descriptor<RWFile>::InitOnce, this);
+    return once_.Init(&Descriptor<RWFile>::InitOnce<Mode>, this);
   }
 
+  template <Env::OpenMode Mode>
   Status InitOnce() {
-    return ReopenFileIfNecessary(nullptr);
+    return ReopenFileIfNecessary<Mode>(nullptr);
   }
 
+  template <Env::OpenMode Mode>
   Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const {
     ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache());
     CHECK(!base_.invalidated());
@@ -337,7 +340,7 @@ class Descriptor<RWFile> : public RWFile {
 
     // The file was evicted, reopen it.
     RWFileOptions opts;
-    opts.mode = Env::MUST_EXIST;
+    opts.mode = Mode;
     unique_ptr<RWFile> f;
     RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));
 
@@ -488,50 +491,112 @@ Status FileCache::Init() {
 }
 
 template <>
-Status FileCache::OpenExistingFile(const string& file_name,
-                                   shared_ptr<RWFile>* file) {
+Status FileCache::DoOpenFile(const string& file_name,
+                             shared_ptr<internal::Descriptor<RWFile>>* file,
+                             bool* created_desc) {
   shared_ptr<internal::Descriptor<RWFile>> d;
+  bool cd;
   {
     std::lock_guard<simple_spinlock> l(lock_);
-    d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &rwf_descs_);
+    d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
+                               &rwf_descs_, &cd);
     DCHECK(d);
 
+#ifndef NDEBUG
     // Enforce the invariant that a particular file name may only be used by one
     // descriptor at a time. This is expensive so it's only done in DEBUG mode.
-    DCHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &raf_descs_));
+    bool ignored;
+    CHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
+                                  &raf_descs_, &ignored));
+#endif
   }
   if (d->base_.deleted()) {
     return Status::NotFound(kAlreadyDeleted, file_name);
   }
-
-  // Check that the underlying file can be opened (no-op for found descriptors).
-  //
-  // Done outside the lock.
-  RETURN_NOT_OK(d->Init());
   *file = std::move(d);
+  *created_desc = cd;
   return Status::OK();
 }
 
 template <>
-Status FileCache::OpenExistingFile(const string& file_name,
-                                   shared_ptr<RandomAccessFile>* file) {
+Status FileCache::DoOpenFile(const string& file_name,
+                             shared_ptr<internal::Descriptor<RandomAccessFile>>* file,
+                             bool* created_desc) {
   shared_ptr<internal::Descriptor<RandomAccessFile>> d;
+  bool cd;
   {
     std::lock_guard<simple_spinlock> l(lock_);
-    d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &raf_descs_);
+    d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
+                               &raf_descs_, &cd);
     DCHECK(d);
 
+#ifndef NDEBUG
     // Enforce the invariant that a particular file name may only be used by one
     // descriptor at a time. This is expensive so it's only done in DEBUG mode.
-    DCHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &rwf_descs_));
+    bool ignored;
+    CHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
+                                  &rwf_descs_, &ignored));
+#endif
   }
   if (d->base_.deleted()) {
     return Status::NotFound(kAlreadyDeleted, file_name);
   }
+  *file = std::move(d);
+  *created_desc = cd;
+  return Status::OK();
+}
+
+template <>
+Status FileCache::OpenFile<Env::CREATE_OR_OPEN>(const string& file_name,
+                                                shared_ptr<RWFile>* file) {
+  shared_ptr<internal::Descriptor<RWFile>> d;
+  bool ignored;
+  RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored));
+
+  // Check that the underlying file can be opened (no-op for found descriptors).
+  RETURN_NOT_OK(d->Init<Env::CREATE_OR_OPEN>());
+  *file = std::move(d);
+  return Status::OK();
+}
+
+template <>
+Status FileCache::OpenFile<Env::MUST_CREATE>(const string& file_name,
+                                             shared_ptr<RWFile>* file) {
+  shared_ptr<internal::Descriptor<RWFile>> d;
+  bool created_desc;
+  RETURN_NOT_OK(DoOpenFile(file_name, &d, &created_desc));
+
+  if (!created_desc) {
+    return Status::AlreadyPresent("file already exists", file_name);
+  }
+
+  // Check that the underlying file can be opened (no-op for found descriptors).
+  RETURN_NOT_OK(d->Init<Env::MUST_CREATE>());
+  *file = std::move(d);
+  return Status::OK();
+}
+
+template <>
+Status FileCache::OpenFile<Env::MUST_EXIST>(const string& file_name,
+                                            shared_ptr<RWFile>* file) {
+  shared_ptr<internal::Descriptor<RWFile>> d;
+  bool ignored;
+  RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored));
+
+  // Check that the underlying file can be opened (no-op for found descriptors).
+  RETURN_NOT_OK(d->Init<Env::MUST_EXIST>());
+  *file = std::move(d);
+  return Status::OK();
+}
+
+template <>
+Status FileCache::OpenFile<Env::MUST_EXIST>(const string& file_name,
+                                            shared_ptr<RandomAccessFile>* file) {
+  shared_ptr<internal::Descriptor<RandomAccessFile>> d;
+  bool ignored;
+  RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored));
 
   // Check that the underlying file can be opened (no-op for found descriptors).
-  //
-  // Done outside the lock.
   RETURN_NOT_OK(d->Init());
   *file = std::move(d);
   return Status::OK();
@@ -543,8 +608,10 @@ Status FileCache::DeleteFile(const string& file_name) {
   // descriptor in the first map.
   {
     std::lock_guard<simple_spinlock> l(lock_);
+    bool ignored;
     {
-      auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &rwf_descs_);
+      auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
+                                      &rwf_descs_, &ignored);
       if (d) {
         if (d->base_.deleted()) {
           return Status::NotFound(kAlreadyDeleted, file_name);
@@ -554,7 +621,8 @@ Status FileCache::DeleteFile(const string& file_name) {
       }
     }
     {
-      auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &raf_descs_);
+      auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
+                                      &raf_descs_, &ignored);
       if (d) {
         if (d->base_.deleted()) {
           return Status::NotFound(kAlreadyDeleted, file_name);
@@ -585,13 +653,14 @@ void FileCache::Invalidate(const string& file_name) {
   shared_ptr<internal::Descriptor<RandomAccessFile>> raf_desc;
   {
     std::lock_guard<simple_spinlock> l(lock_);
+    bool ignored;
     rwf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
-                                      &rwf_descs_);
+                                      &rwf_descs_, &ignored);
     DCHECK(rwf_desc);
     rwf_desc->base_.MarkInvalidated();
 
     raf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
-                                      &raf_descs_);
+                                      &raf_descs_, &ignored);
     DCHECK(raf_desc);
     raf_desc->base_.MarkInvalidated();
   }
@@ -669,7 +738,8 @@ template <class FileType>
 shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked(
     const string& file_name,
     FindMode mode,
-    DescriptorMap<FileType>* descs) {
+    DescriptorMap<FileType>* descs,
+    bool* created_desc) {
   DCHECK(lock_.is_locked());
 
   shared_ptr<internal::Descriptor<FileType>> d;
@@ -682,6 +752,7 @@ shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked(
 
       // Descriptor is still valid, return it.
       VLOG(2) << "Found existing descriptor: " << file_name;
+      *created_desc = false;
       return d;
     }
     // Descriptor has expired; erase it and pretend we found nothing.
@@ -692,6 +763,9 @@ shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked(
     d = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
     EmplaceOrDie(descs, file_name, d);
     VLOG(2) << "Created new descriptor: " << file_name;
+    *created_desc = true;
+  } else {
+    *created_desc = false;
   }
   return d;
 }
diff --git a/src/kudu/util/file_cache.h b/src/kudu/util/file_cache.h
index ed3663c..c22ee91 100644
--- a/src/kudu/util/file_cache.h
+++ b/src/kudu/util/file_cache.h
@@ -28,15 +28,12 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/cache.h"
 #include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
-class Env;
-class RWFile;
-class RandomAccessFile;
-
 namespace internal {
 
 template <class FileType>
@@ -63,7 +60,7 @@ class Thread;
 // The core of the client-facing API is the cache descriptor. A descriptor
 // uniquely identifies an opened file. To a client, a descriptor is just an
 // open file interface of the variety defined in util/env.h. Clients open
-// descriptors via the OpenExistingFile() cache methods.
+// descriptors via the OpenFile() cache methods.
 //
 // Descriptors are shared objects; an existing descriptor is handed back to a
 // client if a file with the same name is already opened. To facilitate
@@ -118,12 +115,27 @@ class FileCache {
   // to a file-like interface but interfaces with the cache under the hood to
   // reopen a file as needed during file operations.
   //
-  // The underlying file is opened immediately to verify that it indeed exists,
-  // but may be closed later if the cache reaches its upper bound on the number
-  // of open files. It is also closed when the descriptor's last reference is dropped.
-  template <class FileType>
-  Status OpenExistingFile(const std::string& file_name,
-                          std::shared_ptr<FileType>* file);
+  // The underlying file is opened immediately to respect 'Mode', but may be
+  // closed later if the cache reaches its upper bound on the number of open
+  // files. It is also closed when the descriptor's last reference is dropped.
+  //
+  // All file types honor a 'Mode' of MUST_EXIST. Some may honor other modes as
+  // well, although transparently reopening evicted files will always use
+  // MUST_EXIST. Different combinations of modes and file types are expressed as
+  // template specializations; if a file type doesn't support a particular mode,
+  // there will be a linker error.
+  //
+  // TODO(adar): The file cache tries to behave as if users were accessing the
+  // underlying POSIX filesystem directly, but its semantics aren't 100% correct
+  // when using modes other than MUST_EXIST. For example, the behavior of
+  // MUST_CREATE and CREATE_OR_OPEN isn't quite right for open files marked for
+  // deletion. In theory we should "unmark" such a file to indicate that it was
+  // recreated, and truncate it so it's empty for the second client, but the
+  // truncation would corrupt the file for the first client. In short, take
+  // great care when using any mode apart from MUST_EXIST.
+  template <Env::OpenMode Mode, class FileType>
+  Status OpenFile(const std::string& file_name,
+                  std::shared_ptr<FileType>* file);
 
   // Deletes a file by name through the cache.
   //
@@ -183,7 +195,10 @@ class FileCache {
   template <class FileType>
   static void ExpireDescriptorsFromMap(DescriptorMap<FileType>* descs);
 
-  // Looks up a descriptor by file name.
+  // Looks up a descriptor by file name or creates a new one (if requested).
+  //
+  // The value of 'created_desc' will be set in accordance with whether a new
+  // descriptor was created.
   //
   // Must be called with 'lock_' held.
   enum class FindMode {
@@ -197,11 +212,19 @@ class FileCache {
   std::shared_ptr<internal::Descriptor<FileType>> FindDescriptorUnlocked(
       const std::string& file_name,
       FindMode mode,
-      DescriptorMap<FileType>* descs);
+      DescriptorMap<FileType>* descs,
+      bool* created_desc);
 
   // Periodically removes expired descriptors from the descriptor maps.
   void RunDescriptorExpiry();
 
+  // Actually opens the file as per OpenFile. Used to encapsulate the bulk of
+  // OpenFile because C++ prohibits partial specialization of template functions.
+  template <class FileType>
+  Status DoOpenFile(const std::string& file_name,
+                    std::shared_ptr<FileType>* file,
+                    bool* created_desc);
+
   // Status message prefix for files that have already been marked as deleted.
   static const char* const kAlreadyDeleted;
 


[kudu] 02/02: log: start using file cache

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fc4ab691a502067bc4d5bdff30507cac7feb7cfe
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Mon Jan 20 21:40:38 2020 -0800

    log: start using file cache
    
    This patch modifies the log to use the file cache. New WAL segments are
    opened through the cache from the moment we switch to them, meaning
    there's a short period of time as they're being preallocated where they're
    opened outside the cache. Log index chunks are only used through the cache.
    
    The bulk of the patch is plumbing to get the file cache down from the server
    into the various log classes. Most "interesting" log-related tests have been
    modified to instantiate a cache while other unit tests have not, ensuring a
    mix of test coverage.
    
    One important semantic change to be aware of: it is now unsafe to delete a
    log's data or bootstrap a new copy of an existing tablet without first
    closing the old log. Thankfully we only took advantage of this in tests.
    
    I added a new gflag to use as a feature flag, in case things go south.
    
    Change-Id: I2c00f6b839a693e059fa2ce79abf347dbf83bdd0
    Reviewed-on: http://gerrit.cloudera.org:8080/15082
    Tested-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/consensus/consensus_peers-test.cc         | 17 ++++----
 src/kudu/consensus/consensus_queue-test.cc         |  1 +
 src/kudu/consensus/log-test-base.h                 | 18 ++++++--
 src/kudu/consensus/log-test.cc                     | 40 +++++++++++------
 src/kudu/consensus/log.cc                          | 50 +++++++++++++++++-----
 src/kudu/consensus/log.h                           |  5 ++-
 src/kudu/consensus/log_cache-test.cc               |  3 +-
 src/kudu/consensus/log_index-test.cc               | 10 ++++-
 src/kudu/consensus/log_index.cc                    | 28 ++++++++----
 src/kudu/consensus/log_index.h                     |  6 ++-
 src/kudu/consensus/log_reader.cc                   | 13 ++++--
 src/kudu/consensus/log_reader.h                    | 11 ++++-
 src/kudu/consensus/log_util.cc                     | 23 +++++++---
 src/kudu/consensus/log_util.h                      |  2 +
 src/kudu/consensus/mt-log-test.cc                  | 11 +++--
 src/kudu/consensus/raft_consensus_quorum-test.cc   |  6 ++-
 src/kudu/integration-tests/log_verifier.cc         |  9 +++-
 .../timestamp_advancement-itest.cc                 |  7 ++-
 src/kudu/integration-tests/ts_recovery-itest.cc    |  3 +-
 src/kudu/master/sys_catalog.cc                     |  7 +--
 src/kudu/tablet/tablet_bootstrap-test.cc           | 18 ++++----
 src/kudu/tablet/tablet_bootstrap.cc                | 13 ++++--
 src/kudu/tablet/tablet_bootstrap.h                 |  4 +-
 src/kudu/tablet/tablet_replica-test.cc             | 22 +++++-----
 src/kudu/tools/kudu-tool-test.cc                   |  3 +-
 src/kudu/tools/tool_action_local_replica.cc        | 14 ++++--
 src/kudu/tools/tool_action_perf.cc                 |  3 +-
 src/kudu/tools/tool_action_wal.cc                  |  5 ++-
 .../tserver/tablet_copy_source_session-test.cc     | 11 +++--
 src/kudu/tserver/ts_tablet_manager.cc              |  3 +-
 src/kudu/tserver/ts_tablet_manager.h               |  5 +--
 31 files changed, 259 insertions(+), 112 deletions(-)

diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 49c9812..d3afb92 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -15,7 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <cstddef>
+#include "kudu/consensus/consensus_peers.h"
+
 #include <memory>
 #include <string>
 #include <type_traits>
@@ -32,7 +33,6 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus-test-util.h"
 #include "kudu/consensus/consensus.pb.h"
-#include "kudu/consensus/consensus_peers.h"
 #include "kudu/consensus/consensus_queue.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_util.h"
@@ -85,12 +85,13 @@ class ConsensusPeersTest : public KuduTest {
     ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
     ASSERT_OK(fs_manager_->Open());
     ASSERT_OK(Log::Open(options_,
-                       fs_manager_.get(),
-                       kTabletId,
-                       schema_,
-                       0, // schema_version
-                       NULL,
-                       &log_));
+                        fs_manager_.get(),
+                        /*file_cache*/nullptr,
+                        kTabletId,
+                        schema_,
+                        0, // schema_version
+                        /*metric_entity*/nullptr,
+                        &log_));
     clock_.reset(new clock::HybridClock());
     ASSERT_OK(clock_->Init());
 
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index eba226c..572fe31 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -96,6 +96,7 @@ class ConsensusQueueTest : public KuduTest {
     ASSERT_OK(fs_manager_->Open());
     CHECK_OK(log::Log::Open(log::LogOptions(),
                             fs_manager_.get(),
+                            /*file_cache*/nullptr,
                             kTestTablet,
                             schema_,
                             /*schema_version*/0,
diff --git a/src/kudu/consensus/log-test-base.h b/src/kudu/consensus/log-test-base.h
index 62f3284..345492a 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -44,6 +44,7 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/env_util.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
@@ -51,6 +52,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+METRIC_DECLARE_entity(server);
 METRIC_DECLARE_entity(tablet);
 
 namespace kudu {
@@ -155,7 +157,14 @@ class LogTestBase : public KuduTest {
     current_index_ = kStartIndex;
     fs_manager_.reset(new FsManager(env_, FsManagerOpts(GetTestPath("fs_root"))));
     metric_registry_.reset(new MetricRegistry());
-    metric_entity_ = METRIC_ENTITY_tablet.Instantiate(metric_registry_.get(), "log-test-base");
+    metric_entity_tablet_ = METRIC_ENTITY_tablet.Instantiate(
+        metric_registry_.get(), "tablet");
+    metric_entity_server_ = METRIC_ENTITY_server.Instantiate(
+        metric_registry_.get(), "server");
+    // Capacity was chosen arbitrarily: high enough to cache multiple files, but
+    // low enough to see some eviction.
+    file_cache_.reset(new FileCache("log-test-base", env_, 5, metric_entity_server_));
+    ASSERT_OK(file_cache_->Init());
     ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
     ASSERT_OK(fs_manager_->Open());
 
@@ -171,10 +180,11 @@ class LogTestBase : public KuduTest {
     Schema schema_with_ids = SchemaBuilder(schema_).Build();
     return Log::Open(options_,
                      fs_manager_.get(),
+                     file_cache_.get(),
                      kTestTablet,
                      schema_with_ids,
                      0, // schema_version
-                     metric_entity_.get(),
+                     metric_entity_tablet_.get(),
                      &log_);
   }
 
@@ -375,7 +385,9 @@ class LogTestBase : public KuduTest {
   const Schema schema_;
   std::unique_ptr<FsManager> fs_manager_;
   std::unique_ptr<MetricRegistry> metric_registry_;
-  scoped_refptr<MetricEntity> metric_entity_;
+  std::unique_ptr<FileCache> file_cache_;
+  scoped_refptr<MetricEntity> metric_entity_tablet_;
+  scoped_refptr<MetricEntity> metric_entity_server_;
   scoped_refptr<Log> log_;
   int64_t current_index_;
   LogOptions options_;
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index 653ed00..3800554 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -43,7 +43,6 @@
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/opid_util.h"
-#include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
@@ -53,6 +52,7 @@
 #include "kudu/util/debug/sanitizer_scopes.h"
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
@@ -127,15 +127,11 @@ class LogTest : public LogTestBase {
                                        int first_repl_index,
                                        LogReader* reader) {
     string fqp = GetTestPath(strings::Substitute("wal-00000000$0", sequence_number));
-    unique_ptr<WritableFile> w_log_seg;
-    RETURN_NOT_OK(fs_manager_->env()->NewWritableFile(fqp, &w_log_seg));
-    unique_ptr<RWFile> r_log_seg;
-    RWFileOptions opts;
-    opts.mode = Env::MUST_EXIST;
-    RETURN_NOT_OK(fs_manager_->env()->NewRWFile(opts, fqp, &r_log_seg));
+    shared_ptr<RWFile> log_seg;
+    RETURN_NOT_OK(file_cache_->OpenFile<Env::MUST_CREATE>(fqp, &log_seg));
 
     scoped_refptr<ReadableLogSegment> readable_segment(
-        new ReadableLogSegment(fqp, shared_ptr<RWFile>(r_log_seg.release())));
+        new ReadableLogSegment(fqp, std::move(log_seg)));
 
     LogSegmentHeaderPB header;
     header.set_sequence_number(sequence_number);
@@ -350,8 +346,13 @@ void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place,
   // because it has a cached header.
   shared_ptr<LogReader> reader;
   ASSERT_OK(LogReader::Open(fs_manager_.get(),
-                            make_scoped_refptr(new LogIndex(env_, log_->log_dir_)),
-                            kTestTablet, nullptr, &reader));
+                            make_scoped_refptr(new LogIndex(env_,
+                                                            file_cache_.get(),
+                                                            log_->log_dir_)),
+                            kTestTablet,
+                            metric_entity_tablet_,
+                            file_cache_.get(),
+                            &reader));
   ASSERT_EQ(1, reader->num_segments());
 
   SegmentSequence segments;
@@ -412,7 +413,12 @@ TEST_P(LogTestOptionalCompression, TestSegmentRollover) {
   ASSERT_OK(log_->Close());
 
   shared_ptr<LogReader> reader;
-  ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader));
+  ASSERT_OK(LogReader::Open(fs_manager_.get(),
+                            /*index*/nullptr,
+                            kTestTablet,
+                            metric_entity_tablet_,
+                            file_cache_.get(),
+                            &reader));
   reader->GetSegmentsSnapshot(&segments);
 
   ASSERT_TRUE(segments.back()->HasFooter());
@@ -760,7 +766,12 @@ TEST_P(LogTestOptionalCompression, TestWriteManyBatches) {
     vector<scoped_refptr<ReadableLogSegment> > segments;
 
     shared_ptr<LogReader> reader;
-    ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader));
+    ASSERT_OK(LogReader::Open(fs_manager_.get(),
+                              /*index*/nullptr,
+                              kTestTablet,
+                              metric_entity_tablet_,
+                              file_cache_.get(),
+                              &reader));
     reader->GetSegmentsSnapshot(&segments);
 
     for (const scoped_refptr<ReadableLogSegment>& entry : segments) {
@@ -781,9 +792,10 @@ TEST_P(LogTestOptionalCompression, TestWriteManyBatches) {
 // seg004: 0.30 through 0.39
 TEST_P(LogTestOptionalCompression, TestLogReader) {
   LogReader reader(env_,
-                   scoped_refptr<LogIndex>(),
+                   /*index*/nullptr,
                    kTestTablet,
-                   nullptr);
+                   metric_entity_tablet_,
+                   file_cache_.get());
   reader.InitEmptyReaderForTests();
   ASSERT_OK(AppendNewEmptySegmentToReader(2, 10, &reader));
   ASSERT_OK(AppendNewEmptySegmentToReader(3, 20, &reader));
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 429036f..f9a6018 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -47,6 +47,7 @@
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/fault_injection.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/kernel_stack_watchdog.h"
 #include "kudu/util/logging.h"
@@ -146,6 +147,8 @@ DEFINE_double(log_inject_io_error_on_preallocate_fraction, 0.0,
 TAG_FLAG(log_inject_io_error_on_preallocate_fraction, unsafe);
 TAG_FLAG(log_inject_io_error_on_preallocate_fraction, runtime);
 
+// Other flags.
+// -----------------------------
 DEFINE_int64(fs_wal_dir_reserved_bytes, -1,
              "Number of bytes to reserve on the log directory filesystem for "
              "non-Kudu usage. The default, which is represented by -1, is that "
@@ -155,7 +158,12 @@ DEFINE_int64(fs_wal_dir_reserved_bytes, -1,
              "are not currently supported");
 DEFINE_validator(fs_wal_dir_reserved_bytes, [](const char* /*n*/, int64_t v) { return v >= -1; });
 TAG_FLAG(fs_wal_dir_reserved_bytes, runtime);
-TAG_FLAG(fs_wal_dir_reserved_bytes, evolving);
+
+DEFINE_bool(fs_wal_use_file_cache, true,
+            "Whether to use the server-wide file cache for WAL segments and "
+            "WAL index chunks.");
+TAG_FLAG(fs_wal_use_file_cache, runtime);
+TAG_FLAG(fs_wal_use_file_cache, advanced);
 
 // Validate that log_min_segments_to_retain >= 1
 static bool ValidateLogsToRetain(const char* flagname, int value) {
@@ -173,7 +181,6 @@ namespace log {
 
 using consensus::CommitMsg;
 using consensus::ReplicateRefPtr;
-using env_util::OpenFileForRandom;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -634,6 +641,9 @@ Status SegmentAllocator::AllocateNewSegment() {
     allocation_state_ = kAllocationFinished;
   });
 
+  // We could create the new segment file through the cache, but that's tricky
+  // because of the file rename that'll happen later. So instead, we'll create
+  // it outside the cache now, then reopen via the cache when we switch to it.
   string tmp_suffix = Substitute("$0$1", kTmpInfix, ".newsegmentXXXXXX");
   string path_tmpl = JoinPathSegments(ctx_->log_dir, tmp_suffix);
   VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
@@ -676,6 +686,12 @@ Status SegmentAllocator::SwitchToAllocatedSegment(
     RETURN_NOT_OK(env->SyncDir(ctx_->log_dir));
   }
 
+  // Reopen the allocated segment file thru the file cache.
+  if (PREDICT_TRUE(ctx_->file_cache && FLAGS_fs_wal_use_file_cache)) {
+    RETURN_NOT_OK(ctx_->file_cache->OpenFile<Env::MUST_EXIST>(
+        new_segment_path, &next_segment_file_));
+  }
+
   // Create a new segment in memory.
   unique_ptr<WritableLogSegment> new_segment(
       new WritableLogSegment(new_segment_path, next_segment_file_));
@@ -702,9 +718,6 @@ Status SegmentAllocator::SwitchToAllocatedSegment(
 
   // Open the segment we just created in readable form; it is the caller's
   // responsibility to add it to the reader.
-  //
-  // TODO(todd): consider using a global FileCache here? With short log segments and
-  // lots of tablets, this file descriptor usage may add up.
   {
     scoped_refptr<ReadableLogSegment> readable_segment(
         new ReadableLogSegment(new_segment_path, std::move(next_segment_file_)));
@@ -748,6 +761,7 @@ const uint64_t Log::kInitialLogSegmentSequenceNumber = 0L;
 
 Status Log::Open(LogOptions options,
                  FsManager* fs_manager,
+                 FileCache* file_cache,
                  const string& tablet_id,
                  Schema schema,
                  uint32_t schema_version,
@@ -761,6 +775,7 @@ Status Log::Open(LogOptions options,
   ctx.metric_entity = metric_entity;
   ctx.metrics.reset(metric_entity ? new LogMetrics(metric_entity) : nullptr);
   ctx.fs_manager = fs_manager;
+  ctx.file_cache = file_cache;
   scoped_refptr<Log> new_log(new Log(std::move(options), std::move(ctx), std::move(schema),
                                      schema_version));
   RETURN_NOT_OK(new_log->Init());
@@ -781,14 +796,17 @@ Log::Log(LogOptions options, LogContext ctx, Schema schema, uint32_t schema_vers
 Status Log::Init() {
   CHECK_EQ(kLogInitialized, log_state_);
 
-  // Init the index
-  log_index_.reset(new LogIndex(ctx_.fs_manager->env(), ctx_.log_dir));
+  // Init the index.
+  log_index_.reset(new LogIndex(ctx_.fs_manager->env(),
+                                ctx_.file_cache,
+                                ctx_.log_dir));
 
   // Reader for previous segments.
   RETURN_NOT_OK(LogReader::Open(ctx_.fs_manager,
                                 log_index_,
                                 ctx_.tablet_id,
                                 ctx_.metric_entity.get(),
+                                ctx_.file_cache,
                                 &reader_));
 
   // The case where we are continuing an existing log.
@@ -988,7 +1006,7 @@ int GetPrefixSizeToGC(RetentionIndexes retention_indexes, const SegmentSequence&
 }
 
 void Log::GetSegmentsToGCUnlocked(RetentionIndexes retention_indexes,
-                                    SegmentSequence* segments_to_gc) const {
+                                  SegmentSequence* segments_to_gc) const {
   reader_->GetSegmentsSnapshot(segments_to_gc);
   segments_to_gc->resize(GetPrefixSizeToGC(retention_indexes, *segments_to_gc));
 }
@@ -1046,7 +1064,7 @@ Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
 
     // Now that they are no longer referenced by the Log, delete the files.
     *num_gced = 0;
-    for (const scoped_refptr<ReadableLogSegment>& segment : segments_to_delete) {
+    for (const auto& segment : segments_to_delete) {
       string ops_str;
       if (segment->HasFooter() && segment->footer().has_min_replicate_index()) {
         DCHECK(segment->footer().has_max_replicate_index());
@@ -1055,7 +1073,13 @@ Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
                              segment->footer().max_replicate_index());
       }
       LOG_WITH_PREFIX(INFO) << "Deleting log segment in path: " << segment->path() << ops_str;
-      RETURN_NOT_OK(ctx_.fs_manager->env()->DeleteFile(segment->path()));
+      if (PREDICT_TRUE(ctx_.file_cache)) {
+        // Note: the segment files will only be deleted from disk when
+        // segments_to_delete goes out of scope.
+        RETURN_NOT_OK(ctx_.file_cache->DeleteFile(segment->path()));
+      } else {
+        RETURN_NOT_OK(ctx_.fs_manager->env()->DeleteFile(segment->path()));
+      }
       (*num_gced)++;
     }
 
@@ -1170,6 +1194,9 @@ Status Log::DeleteOnDiskData(FsManager* fs_manager, const string& tablet_id) {
   }
   LOG(INFO) << Substitute("T $0 P $1: Deleting WAL directory at $2",
                           tablet_id, fs_manager->uuid(), wal_dir);
+  // We don't need to delete through the file cache; we're guaranteed that
+  // the log has been closed (though this invariant isn't verifiable here
+  // without additional plumbing).
   RETURN_NOT_OK_PREPEND(env->DeleteRecursively(wal_dir),
                         "Unable to recursively delete WAL dir for tablet " + tablet_id);
   return Status::OK();
@@ -1198,6 +1225,9 @@ Status Log::RemoveRecoveryDirIfExists(FsManager* fs_manager, const string& table
     return Status::OK();
   }
   VLOG(1) << kLogPrefix << "Deleting all files from renamed log recovery directory " << tmp_path;
+  // We don't need to delete through the file cache; we're guaranteed that
+  // the log has been closed (though this invariant isn't verifiable here
+  // without additional plumbing).
   RETURN_NOT_OK_PREPEND(fs_manager->env()->DeleteRecursively(tmp_path),
                         "Could not remove renamed recovery dir " + tmp_path);
   VLOG(1) << kLogPrefix << "Completed deletion of old log recovery files and directory "
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index b16d074..cd2907b 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -54,6 +54,7 @@
 namespace kudu {
 
 class CompressionCodec;
+class FileCache;
 class FsManager;
 class RWFile;
 
@@ -74,6 +75,7 @@ struct LogContext {
   scoped_refptr<MetricEntity> metric_entity;
   std::unique_ptr<LogMetrics> metrics;
   FsManager* fs_manager;
+  FileCache* file_cache;
 
   std::string LogPrefix() const;
 };
@@ -277,7 +279,8 @@ class Log : public RefCountedThreadSafe<Log> {
   // Opens or continues a log and sets 'log' to the newly built Log.
   // After a successful Open() the Log is ready to receive entries.
   static Status Open(LogOptions options,
-                     FsManager *fs_manager,
+                     FsManager* fs_manager,
+                     FileCache* file_cache,
                      const std::string& tablet_id,
                      Schema schema,
                      uint32_t schema_version,
diff --git a/src/kudu/consensus/log_cache-test.cc b/src/kudu/consensus/log_cache-test.cc
index 2c58e4b..8ebc97a 100644
--- a/src/kudu/consensus/log_cache-test.cc
+++ b/src/kudu/consensus/log_cache-test.cc
@@ -88,10 +88,11 @@ class LogCacheTest : public KuduTest {
     ASSERT_OK(fs_manager_->Open());
     CHECK_OK(log::Log::Open(log::LogOptions(),
                             fs_manager_.get(),
+                            /*file_cache*/nullptr,
                             kTestTablet,
                             schema_,
                             0, // schema_version
-                            nullptr,
+                            /*metric_entity*/nullptr,
                             &log_));
 
     CloseAndReopenCache(MinimumOpId());
diff --git a/src/kudu/consensus/log_index-test.cc b/src/kudu/consensus/log_index-test.cc
index b9eda61..9572b28 100644
--- a/src/kudu/consensus/log_index-test.cc
+++ b/src/kudu/consensus/log_index-test.cc
@@ -25,6 +25,8 @@
 #include "kudu/consensus/opid_util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/util/file_cache.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -37,9 +39,14 @@ using consensus::OpId;
 
 class LogIndexTest : public KuduTest {
  public:
+  LogIndexTest()
+      : file_cache_("test", env_, 1, /*metric_entity*/ nullptr),
+        index_(new LogIndex(env_, &file_cache_, test_dir_)) {
+  }
+
   virtual void SetUp() OVERRIDE {
     KuduTest::SetUp();
-    index_ = new LogIndex(env_, test_dir_);
+    ASSERT_OK(file_cache_.Init());
   }
 
  protected:
@@ -68,6 +75,7 @@ class LogIndexTest : public KuduTest {
     EXPECT_TRUE(s.IsNotFound()) << s.ToString();
   }
 
+  FileCache file_cache_;
   scoped_refptr<LogIndex> index_;
 };
 
diff --git a/src/kudu/consensus/log_index.cc b/src/kudu/consensus/log_index.cc
index 743a297..a891dc5 100644
--- a/src/kudu/consensus/log_index.cc
+++ b/src/kudu/consensus/log_index.cc
@@ -37,6 +37,7 @@
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/consensus/opid_util.h"
@@ -45,8 +46,12 @@
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/env.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/slice.h"
 
+DECLARE_bool(fs_wal_use_file_cache);
+
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -82,14 +87,14 @@ class LogIndex::IndexChunk : public RefCountedThreadSafe<LogIndex::IndexChunk> {
   IndexChunk(Env* env, string path);
   ~IndexChunk() = default;
 
-  Status Open();
+  Status Open(FileCache* file_cache);
   Status GetEntry(int entry_index, PhysicalEntry* ret) const;
   Status SetEntry(int entry_index, const PhysicalEntry& entry);
 
  private:
   Env* env_;
   const string path_;
-  unique_ptr<RWFile> file_;
+  shared_ptr<RWFile> file_;
 };
 
 LogIndex::IndexChunk::IndexChunk(Env* env, string path)
@@ -97,10 +102,16 @@ LogIndex::IndexChunk::IndexChunk(Env* env, string path)
       path_(std::move(path)) {
 }
 
-Status LogIndex::IndexChunk::Open() {
-  RWFileOptions opts;
-  opts.mode = Env::CREATE_OR_OPEN;
-  RETURN_NOT_OK(env_->NewRWFile(opts, path_, &file_));
+Status LogIndex::IndexChunk::Open(FileCache* file_cache) {
+  if (PREDICT_TRUE(file_cache && FLAGS_fs_wal_use_file_cache)) {
+    RETURN_NOT_OK(file_cache->OpenFile<Env::CREATE_OR_OPEN>(path_, &file_));
+  } else {
+    unique_ptr<RWFile> f;
+    RWFileOptions opts;
+    opts.mode = Env::CREATE_OR_OPEN;
+    RETURN_NOT_OK(env_->NewRWFile(opts, path_, &f));
+    file_.reset(f.release());
+  }
   return file_->Truncate(kChunkFileSize);
 }
 
@@ -124,8 +135,9 @@ Status LogIndex::IndexChunk::SetEntry(int entry_index, const PhysicalEntry& entr
 // LogIndex
 ////////////////////////////////////////////////////////////
 
-LogIndex::LogIndex(Env* env, string base_dir)
+LogIndex::LogIndex(Env* env, FileCache* file_cache, string base_dir)
     : env_(env),
+      file_cache_(file_cache),
       base_dir_(std::move(base_dir)) {
 }
 
@@ -140,7 +152,7 @@ Status LogIndex::OpenChunk(int64_t chunk_idx, scoped_refptr<IndexChunk>* chunk)
   string path = GetChunkPath(chunk_idx);
 
   scoped_refptr<IndexChunk> new_chunk(new IndexChunk(env_, path));
-  RETURN_NOT_OK(new_chunk->Open());
+  RETURN_NOT_OK(new_chunk->Open(file_cache_));
   *chunk = std::move(new_chunk);
   return Status::OK();
 }
diff --git a/src/kudu/consensus/log_index.h b/src/kudu/consensus/log_index.h
index 44f4734..1a4a725 100644
--- a/src/kudu/consensus/log_index.h
+++ b/src/kudu/consensus/log_index.h
@@ -29,6 +29,7 @@
 namespace kudu {
 
 class Env;
+class FileCache;
 
 namespace log {
 
@@ -62,7 +63,7 @@ struct LogIndexEntry {
 // See .cc file for implementation notes.
 class LogIndex : public RefCountedThreadSafe<LogIndex> {
  public:
-  LogIndex(Env* env, std::string base_dir);
+  LogIndex(Env* env, FileCache* file_cache, std::string base_dir);
 
   // Record an index entry in the index.
   Status AddEntry(const LogIndexEntry& entry);
@@ -98,6 +99,9 @@ class LogIndex : public RefCountedThreadSafe<LogIndex> {
   // Environment with which to do file I/O.
   Env* env_;
 
+  // Optional cache thru which index files are opened.
+  FileCache* file_cache_;
+
   // The base directory where index files are located.
   const std::string base_dir_;
 
diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc
index f4367ee..0721c70 100644
--- a/src/kudu/consensus/log_reader.cc
+++ b/src/kudu/consensus/log_reader.cc
@@ -84,8 +84,10 @@ Status LogReader::Open(Env* env,
                        const scoped_refptr<LogIndex>& index,
                        const string& tablet_id,
                        const scoped_refptr<MetricEntity>& metric_entity,
+                       FileCache* file_cache,
                        shared_ptr<LogReader>* reader) {
-  auto log_reader = LogReader::make_shared(env, index, tablet_id, metric_entity);
+  auto log_reader = LogReader::make_shared(env, index, tablet_id,
+                                           metric_entity, file_cache);
 
   RETURN_NOT_OK_PREPEND(log_reader->Init(tablet_wal_dir),
                         "Unable to initialize log reader");
@@ -97,16 +99,19 @@ Status LogReader::Open(FsManager* fs_manager,
                        const scoped_refptr<LogIndex>& index,
                        const std::string& tablet_id,
                        const scoped_refptr<MetricEntity>& metric_entity,
+                       FileCache* file_cache,
                        std::shared_ptr<LogReader>* reader) {
   return LogReader::Open(fs_manager->env(), fs_manager->GetTabletWalDir(tablet_id),
-                         index, tablet_id, metric_entity, reader);
+                         index, tablet_id, metric_entity, file_cache, reader);
 }
 
 LogReader::LogReader(Env* env,
                      scoped_refptr<LogIndex> index,
                      string tablet_id,
-                     const scoped_refptr<MetricEntity>& metric_entity)
+                     const scoped_refptr<MetricEntity>& metric_entity,
+                     FileCache* file_cache)
     : env_(env),
+      file_cache_(file_cache),
       log_index_(std::move(index)),
       tablet_id_(std::move(tablet_id)),
       state_(kLogReaderInitialized) {
@@ -146,7 +151,7 @@ Status LogReader::Init(const string& tablet_wal_path) {
     if (HasPrefixString(log_file, FsManager::kWalFileNamePrefix)) {
       string fqp = JoinPathSegments(tablet_wal_path, log_file);
       scoped_refptr<ReadableLogSegment> segment;
-      Status s = ReadableLogSegment::Open(env_, fqp, &segment);
+      Status s = ReadableLogSegment::Open(env_, file_cache_, fqp, &segment);
       if (s.IsUninitialized()) {
         // This indicates that the segment was created but the writer
         // crashed before the header was successfully written. In this
diff --git a/src/kudu/consensus/log_reader.h b/src/kudu/consensus/log_reader.h
index ca80179..2809c1a 100644
--- a/src/kudu/consensus/log_reader.h
+++ b/src/kudu/consensus/log_reader.h
@@ -34,6 +34,7 @@ namespace kudu {
 
 class Counter;
 class Env;
+class FileCache;
 class FsManager;
 class Histogram;
 class MetricEntity;
@@ -73,6 +74,7 @@ class LogReader : public enable_make_shared<LogReader> {
                      const scoped_refptr<LogIndex>& index,
                      const std::string& tablet_id,
                      const scoped_refptr<MetricEntity>& metric_entity,
+                     FileCache* file_cache,
                      std::shared_ptr<LogReader>* reader);
 
   // Same as above, but will use `fs_manager` to determine the default WAL dir
@@ -81,6 +83,7 @@ class LogReader : public enable_make_shared<LogReader> {
                      const scoped_refptr<LogIndex>& index,
                      const std::string& tablet_id,
                      const scoped_refptr<MetricEntity>& metric_entity,
+                     FileCache* file_cache,
                      std::shared_ptr<LogReader>* reader);
 
   // Return the minimum replicate index that is retained in the currently available
@@ -120,8 +123,11 @@ class LogReader : public enable_make_shared<LogReader> {
   std::string ToString() const;
 
  protected:
-  LogReader(Env* env, scoped_refptr<LogIndex> index, std::string tablet_id,
-            const scoped_refptr<MetricEntity>& metric_entity);
+  LogReader(Env* env,
+            scoped_refptr<LogIndex> index,
+            std::string tablet_id,
+            const scoped_refptr<MetricEntity>& metric_entity,
+            FileCache* file_cache);
 
  private:
   FRIEND_TEST(LogTestOptionalCompression, TestLogReader);
@@ -184,6 +190,7 @@ class LogReader : public enable_make_shared<LogReader> {
   void InitEmptyReaderForTests();
 
   Env* env_;
+  FileCache* file_cache_;
   const scoped_refptr<LogIndex> log_index_;
   const std::string tablet_id_;
 
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 9783630..7fcfc1e 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -42,6 +42,7 @@
 #include "kudu/util/crc.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/fault_injection.h"
+#include "kudu/util/file_cache.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/pb_util.h"
@@ -66,6 +67,8 @@ DEFINE_double(fault_crash_before_write_log_segment_header, 0.0,
               "Fraction of the time we will crash just before writing the log segment header");
 TAG_FLAG(fault_crash_before_write_log_segment_header, unsafe);
 
+DECLARE_bool(fs_wal_use_file_cache);
+
 using kudu::consensus::OpId;
 using std::string;
 using std::shared_ptr;
@@ -258,16 +261,24 @@ Status LogEntryReader::MakeCorruptionStatus(const Status& status) const {
 ////////////////////////////////////////////////////////////
 
 Status ReadableLogSegment::Open(Env* env,
+                                FileCache* file_cache,
                                 const string& path,
                                 scoped_refptr<ReadableLogSegment>* segment) {
   VLOG(1) << "Parsing wal segment: " << path;
-  unique_ptr<RWFile> file;
-  RWFileOptions opts;
-  opts.mode = Env::MUST_EXIST;
-  RETURN_NOT_OK_PREPEND(env->NewRWFile(opts, path, &file),
-                        "Unable to open file for reading");
+  shared_ptr<RWFile> file;
+  if (PREDICT_TRUE(file_cache && FLAGS_fs_wal_use_file_cache)) {
+    RETURN_NOT_OK_PREPEND(file_cache->OpenFile<Env::MUST_EXIST>(path, &file),
+                          "unable to open file for reading");
+  } else {
+    unique_ptr<RWFile> f;
+    RWFileOptions opts;
+    opts.mode = Env::MUST_EXIST;
+    RETURN_NOT_OK_PREPEND(env->NewRWFile(opts, path, &f),
+                          "Unable to open file for reading");
+    file.reset(f.release());
+  }
 
-  segment->reset(new ReadableLogSegment(path, shared_ptr<RWFile>(file.release())));
+  segment->reset(new ReadableLogSegment(path, std::move(file)));
   RETURN_NOT_OK_PREPEND((*segment)->Init(), "Unable to initialize segment");
   return Status::OK();
 }
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 27e8159..62571f1 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -44,6 +44,7 @@ DECLARE_bool(log_force_fsync_all);
 namespace kudu {
 
 class CompressionCodec;
+class FileCache;
 
 namespace log {
 
@@ -177,6 +178,7 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> {
  public:
   // Factory method to construct a ReadableLogSegment from a file on the FS.
   static Status Open(Env* env,
+                     FileCache* file_cache,
                      const std::string& path,
                      scoped_refptr<ReadableLogSegment>* segment);
 
diff --git a/src/kudu/consensus/mt-log-test.cc b/src/kudu/consensus/mt-log-test.cc
index cb9329f..5a540d4 100644
--- a/src/kudu/consensus/mt-log-test.cc
+++ b/src/kudu/consensus/mt-log-test.cc
@@ -22,11 +22,11 @@
 #include <memory>
 #include <mutex>
 #include <ostream>
+#include <utility>
 #include <thread>
 #include <vector>
 
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -43,7 +43,6 @@
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/ref_counted_replicate.h"
 #include "kudu/gutil/bind.h"
-#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -51,7 +50,6 @@
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/locks.h"
-#include "kudu/util/metrics.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
 #include "kudu/util/status_callback.h"
@@ -204,7 +202,12 @@ class MultiThreadedLogTest : public LogTestBase {
 
   void VerifyLog() {
     shared_ptr<LogReader> reader;
-    ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader));
+    ASSERT_OK(LogReader::Open(fs_manager_.get(),
+                              /*index*/nullptr,
+                              kTestTablet,
+                              metric_entity_tablet_,
+                              file_cache_.get(),
+                              &reader));
     SegmentSequence segments;
     reader->GetSegmentsSnapshot(&segments);
 
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 4615ff5..3a60f68 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -154,10 +154,11 @@ class RaftConsensusQuorumTest : public KuduTest {
       scoped_refptr<Log> log;
       RETURN_NOT_OK(Log::Open(LogOptions(),
                               fs_manager.get(),
+                              /*file_cache*/nullptr,
                               kTestTablet,
                               schema_,
                               0, // schema_version
-                              nullptr,
+                              /*metric_entity*/nullptr,
                               &log));
       logs_.emplace_back(std::move(log));
       fs_managers_.push_back(std::move(fs_manager));
@@ -435,9 +436,10 @@ class RaftConsensusQuorumTest : public KuduTest {
     log->Close();
     shared_ptr<LogReader> log_reader;
     ASSERT_OK(log::LogReader::Open(fs_managers_[idx].get(),
-                                   scoped_refptr<log::LogIndex>(),
+                                   /*index*/nullptr,
                                    kTestTablet,
                                    metric_entity_.get(),
+                                   /*file_cache*/nullptr,
                                    &log_reader));
     log::SegmentSequence segments;
     log_reader->GetSegmentsSnapshot(&segments);
diff --git a/src/kudu/integration-tests/log_verifier.cc b/src/kudu/integration-tests/log_verifier.cc
index 48be779..df21938 100644
--- a/src/kudu/integration-tests/log_verifier.cc
+++ b/src/kudu/integration-tests/log_verifier.cc
@@ -75,8 +75,13 @@ Status LogVerifier::ScanForCommittedOpIds(int ts_idx, const string& tablet_id,
 
   shared_ptr<LogReader> reader;
   const string wal_dir = JoinPathSegments(inspector_->WalDirForTS(ts_idx), tablet_id);
-  RETURN_NOT_OK(LogReader::Open(env_, wal_dir, scoped_refptr<log::LogIndex>(), tablet_id,
-                                scoped_refptr<MetricEntity>(), &reader));
+  RETURN_NOT_OK(LogReader::Open(env_,
+                                wal_dir,
+                                /*index*/nullptr,
+                                tablet_id,
+                                /*metric_entity*/nullptr,
+                                /*file_cache*/nullptr,
+                                &reader));
   log::SegmentSequence segs;
   reader->GetSegmentsSnapshot(&segs);
   unique_ptr<log::LogEntryPB> entry;
diff --git a/src/kudu/integration-tests/timestamp_advancement-itest.cc b/src/kudu/integration-tests/timestamp_advancement-itest.cc
index 8166052..5924e3b 100644
--- a/src/kudu/integration-tests/timestamp_advancement-itest.cc
+++ b/src/kudu/integration-tests/timestamp_advancement-itest.cc
@@ -182,8 +182,11 @@ class TimestampAdvancementITest : public MiniClusterITestBase {
     shared_ptr<LogReader> reader;
     RETURN_NOT_OK(LogReader::Open(
        ts->server()->fs_manager(),
-       scoped_refptr<log::LogIndex>(), tablet_id,
-       scoped_refptr<MetricEntity>(), &reader));
+       /*index*/nullptr,
+       tablet_id,
+       /*metric_entity*/nullptr,
+       ts->server()->file_cache(),
+       &reader));
     log::SegmentSequence segs;
     reader->GetSegmentsSnapshot(&segs);
     unique_ptr<log::LogEntryPB> entry;
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc
index 36b9ea8..04541ac 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -601,10 +601,11 @@ TEST_P(TsRecoveryITestDeathTest, TestRecoverFromOpIdOverflow) {
       scoped_refptr<Log> log;
       ASSERT_OK(Log::Open(LogOptions(),
                           fs_manager.get(),
+                          /*file_cache*/nullptr,
                           tablet_id,
                           SchemaBuilder(GetSimpleTestSchema()).Build(),
                           0, // schema_version
-                          nullptr,
+                          /*metric_entity*/nullptr,
                           &log));
 
       // Write a series of negative OpIds.
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index e716c23..c095096 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -419,12 +419,13 @@ Status SysCatalogTable::SetupTablet(
       cmeta->CommittedConfig(),
       master_->clock(),
       master_->mem_tracker(),
-      scoped_refptr<rpc::ResultTracker>(),
+      /*result_tracker*/nullptr,
       metric_registry_,
+      master_->file_cache(),
       tablet_replica_,
+      tablet_replica_->log_anchor_registry(),
       &tablet,
       &log,
-      tablet_replica_->log_anchor_registry(),
       &consensus_info), "failed to bootstrap system catalog");
 
   // TODO(matteo): Do we have a setSplittable(false) or something from the
@@ -435,7 +436,7 @@ Status SysCatalogTable::SetupTablet(
       tablet,
       master_->clock(),
       master_->messenger(),
-      scoped_refptr<rpc::ResultTracker>(),
+      /*result_tracker*/nullptr,
       log,
       master_->tablet_prepare_pool(),
       master_->dns_resolver()), "failed to start system catalog replica");
diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc
index fd29b29..46229f7 100644
--- a/src/kudu/tablet/tablet_bootstrap-test.cc
+++ b/src/kudu/tablet/tablet_bootstrap-test.cc
@@ -96,9 +96,6 @@ using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
-
-class MemTracker;
-
 namespace tablet {
 
 class BootstrapTest : public LogTestBase {
@@ -160,19 +157,24 @@ class BootstrapTest : public LogTestBase {
     scoped_refptr<ConsensusMetadata> cmeta;
     RETURN_NOT_OK(cmeta_manager_->Load(meta->tablet_id(), &cmeta));
 
+    // Close the existing log to evict any segments from the file cache so that
+    // bootstrap won't access any stale (cached) segments.
+    RETURN_NOT_OK(log_->Close());
+
     scoped_refptr<LogAnchorRegistry> log_anchor_registry(new LogAnchorRegistry());
     // Now attempt to recover the log
     RETURN_NOT_OK(BootstrapTablet(
         meta,
         cmeta->CommittedConfig(),
         clock_.get(),
-        shared_ptr<MemTracker>(),
-        scoped_refptr<rpc::ResultTracker>(),
-        nullptr,
-        nullptr, // no status listener
+        /*mem_tracker*/nullptr,
+        /*result_tracker*/nullptr,
+        metric_registry_.get(),
+        file_cache_.get(),
+        /*tablet_replica*/nullptr,
+        std::move(log_anchor_registry),
         tablet,
         &log_,
-        log_anchor_registry,
         boot_info));
 
     return Status::OK();
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index baf1537..1f3e8b3 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -197,6 +197,7 @@ class TabletBootstrap {
                   shared_ptr<MemTracker> mem_tracker,
                   scoped_refptr<ResultTracker> result_tracker,
                   MetricRegistry* metric_registry,
+                  FileCache* file_cache,
                   scoped_refptr<TabletReplica> tablet_replica,
                   scoped_refptr<LogAnchorRegistry> log_anchor_registry);
 
@@ -377,6 +378,7 @@ class TabletBootstrap {
   shared_ptr<MemTracker> mem_tracker_;
   scoped_refptr<rpc::ResultTracker> result_tracker_;
   MetricRegistry* metric_registry_;
+  FileCache* file_cache_;
   scoped_refptr<TabletReplica> tablet_replica_;
   unique_ptr<tablet::Tablet> tablet_;
   const scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
@@ -445,10 +447,11 @@ Status BootstrapTablet(scoped_refptr<TabletMetadata> tablet_meta,
                        shared_ptr<MemTracker> mem_tracker,
                        scoped_refptr<ResultTracker> result_tracker,
                        MetricRegistry* metric_registry,
+                       FileCache* file_cache,
                        scoped_refptr<TabletReplica> tablet_replica,
+                       scoped_refptr<log::LogAnchorRegistry> log_anchor_registry,
                        shared_ptr<tablet::Tablet>* rebuilt_tablet,
                        scoped_refptr<log::Log>* rebuilt_log,
-                       scoped_refptr<log::LogAnchorRegistry> log_anchor_registry,
                        ConsensusBootstrapInfo* consensus_info) {
   TRACE_EVENT1("tablet", "BootstrapTablet",
                "tablet_id", tablet_meta->tablet_id());
@@ -458,6 +461,7 @@ Status BootstrapTablet(scoped_refptr<TabletMetadata> tablet_meta,
                             std::move(mem_tracker),
                             std::move(result_tracker),
                             metric_registry,
+                            file_cache,
                             std::move(tablet_replica),
                             std::move(log_anchor_registry));
   RETURN_NOT_OK(bootstrap.Bootstrap(rebuilt_tablet, rebuilt_log, consensus_info));
@@ -494,6 +498,7 @@ TabletBootstrap::TabletBootstrap(
     shared_ptr<MemTracker> mem_tracker,
     scoped_refptr<ResultTracker> result_tracker,
     MetricRegistry* metric_registry,
+    FileCache* file_cache,
     scoped_refptr<TabletReplica> tablet_replica,
     scoped_refptr<LogAnchorRegistry> log_anchor_registry)
     : tablet_meta_(std::move(tablet_meta)),
@@ -502,6 +507,7 @@ TabletBootstrap::TabletBootstrap(
       mem_tracker_(std::move(mem_tracker)),
       result_tracker_(std::move(result_tracker)),
       metric_registry_(metric_registry),
+      file_cache_(file_cache),
       tablet_replica_(std::move(tablet_replica)),
       log_anchor_registry_(std::move(log_anchor_registry)) {}
 
@@ -720,17 +726,16 @@ Status TabletBootstrap::OpenLogReaderInRecoveryDir() {
   const string recovery_dir = fs_manager->GetTabletWalRecoveryDir(tablet_id);
   RETURN_NOT_OK_PREPEND(LogReader::Open(fs_manager->env(), recovery_dir, log_index, tablet_id,
                                         tablet_->GetMetricEntity().get(),
+                                        file_cache_,
                                         &log_reader_),
                         "Could not open LogReader. Reason");
   return Status::OK();
 }
 
 Status TabletBootstrap::OpenNewLog() {
-  OpId init;
-  init.set_term(0);
-  init.set_index(0);
   RETURN_NOT_OK(Log::Open(LogOptions(),
                           tablet_->metadata()->fs_manager(),
+                          file_cache_,
                           tablet_->tablet_id(),
                           *tablet_->schema(),
                           tablet_->metadata()->schema_version(),
diff --git a/src/kudu/tablet/tablet_bootstrap.h b/src/kudu/tablet/tablet_bootstrap.h
index 5bfa5a5..1bcdf24 100644
--- a/src/kudu/tablet/tablet_bootstrap.h
+++ b/src/kudu/tablet/tablet_bootstrap.h
@@ -23,6 +23,7 @@
 
 namespace kudu {
 
+class FileCache;
 class MemTracker;
 class MetricRegistry;
 
@@ -63,10 +64,11 @@ Status BootstrapTablet(scoped_refptr<TabletMetadata> tablet_meta,
                        std::shared_ptr<MemTracker> mem_tracker,
                        scoped_refptr<rpc::ResultTracker> result_tracker,
                        MetricRegistry* metric_registry,
+                       FileCache* file_cache,
                        scoped_refptr<TabletReplica> tablet_replica,
+                       scoped_refptr<log::LogAnchorRegistry> log_anchor_registry,
                        std::shared_ptr<Tablet>* rebuilt_tablet,
                        scoped_refptr<log::Log>* rebuilt_log,
-                       scoped_refptr<log::LogAnchorRegistry> log_anchor_registry,
                        consensus::ConsensusBootstrapInfo* consensus_info);
 
 }  // namespace tablet
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index b1bf7df..3d60c47 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -28,7 +28,6 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/clock/clock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row_operations.h"
@@ -109,9 +108,6 @@ using std::string;
 using std::unique_ptr;
 
 namespace kudu {
-
-class MemTracker;
-
 namespace tablet {
 
 static Schema GetTestSchema() {
@@ -181,9 +177,14 @@ class TabletReplicaTest : public KuduTabletTest {
 
   Status StartReplica(const ConsensusBootstrapInfo& info) {
     scoped_refptr<Log> log;
-    RETURN_NOT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
-                            *tablet()->schema(), tablet()->metadata()->schema_version(),
-                            metric_entity_.get(), &log));
+    RETURN_NOT_OK(Log::Open(LogOptions(),
+                            fs_manager(),
+                            /*file_cache*/nullptr,
+                            tablet()->tablet_id(),
+                            *tablet()->schema(),
+                            tablet()->metadata()->schema_version(),
+                            metric_entity_.get(),
+                            &log));
     tablet_replica_->SetBootstrapping();
     return tablet_replica_->Start(info,
                                   tablet(),
@@ -226,13 +227,14 @@ class TabletReplicaTest : public KuduTabletTest {
     ASSERT_OK(BootstrapTablet(tablet_replica_->tablet_metadata(),
                               cmeta->CommittedConfig(),
                               clock(),
-                              shared_ptr<MemTracker>(),
-                              scoped_refptr<ResultTracker>(),
+                              /*mem_tracker*/nullptr,
+                              /*result_tracker*/nullptr,
                               &metric_registry_,
+                              /*file_cache*/nullptr,
                               tablet_replica_,
+                              tablet_replica_->log_anchor_registry(),
                               &tablet,
                               &log,
-                              tablet_replica_->log_anchor_registry(),
                               &bootstrap_info));
     ASSERT_OK(tablet_replica_->Start(bootstrap_info,
                                      tablet,
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 0c47957..fd8b66c 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -1586,10 +1586,11 @@ TEST_F(ToolTest, TestWalDump) {
     scoped_refptr<Log> log;
     ASSERT_OK(Log::Open(LogOptions(),
                         &fs,
+                        /*file_cache*/nullptr,
                         kTestTablet,
                         kSchemaWithIds,
                         0, // schema_version
-                        scoped_refptr<MetricEntity>(),
+                        /*metric_entity*/nullptr,
                         &log));
 
     OpId opid = consensus::MakeOpId(1, 1);
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 347d1b6..36b1005 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -50,6 +50,7 @@
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/fs/io_context.h"
 #include "kudu/gutil/map-util.h"
@@ -186,8 +187,12 @@ Status ParseHostPortString(const string& hostport_str, HostPort* hostport) {
 Status FindLastLoggedOpId(FsManager* fs, const string& tablet_id,
                           OpId* last_logged_opid) {
   shared_ptr<LogReader> reader;
-  RETURN_NOT_OK(LogReader::Open(fs, scoped_refptr<log::LogIndex>(), tablet_id,
-                                scoped_refptr<MetricEntity>(), &reader));
+  RETURN_NOT_OK(LogReader::Open(fs,
+                                /*index*/nullptr,
+                                tablet_id,
+                                /*metric_entity*/nullptr,
+                                /*file_cache*/nullptr,
+                                &reader));
   SegmentSequence segs;
   reader->GetSegmentsSnapshot(&segs);
   // Reverse iterate the segments to find the 'last replicated' entry quickly.
@@ -521,9 +526,10 @@ Status DumpWals(const RunnerContext& context) {
 
   shared_ptr<LogReader> reader;
   RETURN_NOT_OK(LogReader::Open(fs_manager.get(),
-                                scoped_refptr<LogIndex>(),
+                                /*index*/nullptr,
                                 tablet_id,
-                                scoped_refptr<MetricEntity>(),
+                                /*metric_entity*/nullptr,
+                                /*file_cache*/nullptr,
                                 &reader));
 
   SegmentSequence segments;
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index c31f19d..c961c8a 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -829,10 +829,11 @@ Status TabletScan(const RunnerContext& context) {
                                         /*mem_tracker=*/ nullptr,
                                         /*result_tracker=*/ nullptr,
                                         /*metric_registry=*/ nullptr,
+                                        /*file_cache=*/ nullptr,
                                         /*tablet_replica=*/ nullptr,
+                                        std::move(registry),
                                         &tablet,
                                         &log,
-                                        std::move(registry),
                                         &cbi));
 
   // Tablet has been bootstrapped and opened. We can now scan it.
diff --git a/src/kudu/tools/tool_action_wal.cc b/src/kudu/tools/tool_action_wal.cc
index 0c2a600..a27ab91 100644
--- a/src/kudu/tools/tool_action_wal.cc
+++ b/src/kudu/tools/tool_action_wal.cc
@@ -44,7 +44,10 @@ Status Dump(const RunnerContext& context) {
   const string& segment_path = FindOrDie(context.required_args, kPathArg);
 
   scoped_refptr<ReadableLogSegment> segment;
-  RETURN_NOT_OK(ReadableLogSegment::Open(Env::Default(), segment_path, &segment));
+  RETURN_NOT_OK(ReadableLogSegment::Open(Env::Default(),
+                                         /*file_cache*/nullptr,
+                                         segment_path,
+                                         &segment));
   RETURN_NOT_OK(PrintSegment(segment));
   return Status::OK();
 }
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 8aa5eec..1041d9b 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -14,7 +14,8 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#include "kudu/tablet/tablet-test-util.h"
+
+#include "kudu/tserver/tablet_copy_source_session.h"
 
 #include <cstdint>
 #include <memory>
@@ -26,7 +27,6 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/clock/clock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row_operations.h"
@@ -52,13 +52,13 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/result_tracker.h"
 #include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/transactions/transaction.h"
 #include "kudu/tablet/transactions/write_transaction.h"
 #include "kudu/tserver/tablet_copy.pb.h"
-#include "kudu/tserver/tablet_copy_source_session.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/crc.h"
@@ -134,7 +134,10 @@ class TabletCopyTest : public KuduTabletTest {
  protected:
   void SetUpTabletReplica() {
     scoped_refptr<Log> log;
-    ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
+    ASSERT_OK(Log::Open(LogOptions(),
+                        fs_manager(),
+                        /*file_cache=*/ nullptr,
+                        tablet()->tablet_id(),
                         *tablet()->schema(),
                         /*schema_version=*/ 0,
                         /*metric_entity=*/ nullptr,
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index f96d45b..ee578ba 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1124,10 +1124,11 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletReplica>& replica,
                         server_->mem_tracker(),
                         server_->result_tracker(),
                         metric_registry_,
+                        server_->file_cache(),
                         replica,
+                        replica->log_anchor_registry(),
                         &tablet,
                         &log,
-                        replica->log_anchor_registry(),
                         &bootstrap_info);
     if (!s.ok()) {
       LOG(ERROR) << LogPrefix(tablet_id) << "Tablet failed to bootstrap: "
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 68cc4b7..01f3518 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -49,16 +49,13 @@ class optional;
 }
 
 namespace kudu {
-class TableExtraConfigPB;
-}  // namespace kudu
-
-namespace kudu {
 
 class FsManager;
 class NodeInstancePB;
 class Partition;
 class PartitionSchema;
 class Schema;
+class TableExtraConfigPB;
 class ThreadPool;
 
 namespace consensus {