You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/02/20 23:54:15 UTC

[impala] 02/02: IMPALA-8690 (prep 2): Switch DataCache to Impala's cache implementation

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 091faebe61645c9f86f59bc67668688f15a2fc7c
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Wed Feb 5 17:23:20 2020 -0800

    IMPALA-8690 (prep 2): Switch DataCache to Impala's cache implementation
    
    In a previous change, the Kudu caching code from be/src/kudu/util was
    copied to be/src/util/cache. This gets that code workable for Impala
    and uses it for the DataCache.
    
    This involves the following changes:
    1. Fix up includes to point to Impala's cache implementation
    2. Remove NVM Cache support and code (not used by Impala)
    3. Remove CacheMetrics code (not used by Impala)
    4. Move the cache code to the impala namespace
    5. Modify cache-bench and cache-test to use Impala's backend
       test infrastructure
    6. Switch DataCache to use be/src/util/cache
    
    These are only boilerplate changes. The cache implementation has not
    meaningfully changed.
    
    Testing:
     - Ran core tests
     - The modified version of Kudu's cache-test passes
     - The modified version of Kudu's cache-bench runs successfully
    
    Change-Id: I917f8352c9276373dd2761af986bf3487855271c
    Reviewed-on: http://gerrit.cloudera.org:8080/15170
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/CMakeLists.txt                |   2 +
 be/src/runtime/io/data-cache.cc  |  16 +-
 be/src/runtime/io/data-cache.h   |  12 +-
 be/src/util/CMakeLists.txt       |   2 +
 be/src/util/cache/CMakeLists.txt |  12 +-
 be/src/util/cache/cache-bench.cc |  24 +-
 be/src/util/cache/cache-test.cc  |  80 +----
 be/src/util/cache/cache.cc       |  98 +-----
 be/src/util/cache/cache.h        |  12 +-
 be/src/util/cache/nvm_cache.cc   | 719 ---------------------------------------
 be/src/util/cache/nvm_cache.h    |  45 ---
 bin/rat_exclude_files.txt        |   1 -
 12 files changed, 71 insertions(+), 952 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index c2d3ea6..42fa315 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -416,6 +416,7 @@ set (IMPALA_LIBS
   token_proto
   Udf
   Util
+  UtilCache
 )
 
 set (IMPALA_LINK_LIBS
@@ -454,6 +455,7 @@ set (UNIFIED_TEST_LIBS
   SchedulingTests
   ServiceTests
   UtilTests
+  UtilCacheTests
 )
 set (UNIFIED_TEST_LINK_LIBS
   ${WL_START_GROUP}
diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc
index 85dd277..4f74566 100644
--- a/be/src/runtime/io/data-cache.cc
+++ b/be/src/runtime/io/data-cache.cc
@@ -29,7 +29,6 @@
 
 #include "exec/kudu-util.h"
 #include "kudu/util/async_logger.h"
-#include "kudu/util/cache.h"
 #include "kudu/util/env.h"
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/locks.h"
@@ -40,6 +39,7 @@
 #include "gutil/strings/split.h"
 #include "gutil/walltime.h"
 #include "util/bit-util.h"
+#include "util/cache/cache.h"
 #include "util/error-util.h"
 #include "util/filesystem-util.h"
 #include "util/hash-util.h"
@@ -446,7 +446,7 @@ DataCache::Partition::Partition(
     capacity_(max<int64_t>(capacity, PAGE_SIZE)),
     max_opened_files_(max_opened_files),
     meta_cache_(
-        kudu::NewCache<kudu::Cache::EvictionPolicy::LRU, kudu::Cache::MemoryType::DRAM>(
+        NewCache<Cache::EvictionPolicy::LRU, Cache::MemoryType::DRAM>(
             capacity_, path_)) {}
 
 DataCache::Partition::~Partition() {
@@ -553,8 +553,7 @@ int64_t DataCache::Partition::Lookup(const CacheKey& cache_key, int64_t bytes_to
     uint8_t* buffer) {
   DCHECK(!closed_);
   Slice key = cache_key.ToSlice();
-  kudu::Cache::UniqueHandle handle(
-      meta_cache_->Lookup(key, kudu::Cache::EXPECT_IN_CACHE));
+  Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::EXPECT_IN_CACHE));
 
   if (handle.get() == nullptr) {
     if (tracer_ != nullptr) {
@@ -589,7 +588,7 @@ int64_t DataCache::Partition::Lookup(const CacheKey& cache_key, int64_t bytes_to
 }
 
 bool DataCache::Partition::HandleExistingEntry(const Slice& key,
-    const kudu::Cache::UniqueHandle& handle, const uint8_t* buffer, int64_t buffer_len) {
+    const Cache::UniqueHandle& handle, const uint8_t* buffer, int64_t buffer_len) {
   // Unpack the cache entry.
   CacheEntry entry(meta_cache_->Value(handle));
 
@@ -612,7 +611,7 @@ bool DataCache::Partition::InsertIntoCache(const Slice& key, CacheFile* cache_fi
   const int64_t charge_len = BitUtil::RoundUp(buffer_len, PAGE_SIZE);
 
   // Allocate a cache handle
-  kudu::Cache::UniquePendingHandle pending_handle(
+  Cache::UniquePendingHandle pending_handle(
       meta_cache_->Allocate(key, sizeof(CacheEntry), charge_len));
   if (UNLIKELY(pending_handle.get() == nullptr)) return false;
 
@@ -629,7 +628,7 @@ bool DataCache::Partition::InsertIntoCache(const Slice& key, CacheFile* cache_fi
   // Insert the new entry into the cache.
   CacheEntry entry(cache_file, insertion_offset, buffer_len, checksum);
   memcpy(meta_cache_->MutableValue(&pending_handle), &entry, sizeof(CacheEntry));
-  kudu::Cache::UniqueHandle handle(meta_cache_->Insert(std::move(pending_handle), this));
+  Cache::UniqueHandle handle(meta_cache_->Insert(std::move(pending_handle), this));
   pending_handle = nullptr;
   ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES->Increment(charge_len);
   return true;
@@ -645,8 +644,7 @@ bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffe
 
   // Check for existing entry.
   {
-    kudu::Cache::UniqueHandle handle(
-        meta_cache_->Lookup(key, kudu::Cache::EXPECT_IN_CACHE));
+    Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::EXPECT_IN_CACHE));
     if (handle.get() != nullptr) {
       if (HandleExistingEntry(key, handle, buffer, buffer_len)) return false;
     }
diff --git a/be/src/runtime/io/data-cache.h b/be/src/runtime/io/data-cache.h
index fde1e97..1dd3e38 100644
--- a/be/src/runtime/io/data-cache.h
+++ b/be/src/runtime/io/data-cache.h
@@ -25,16 +25,12 @@
 #include <gtest/gtest_prod.h>
 
 #include "common/status.h"
+#include "util/cache/cache.h"
 #include "util/spinlock.h"
 #include "util/thread-pool.h"
-#include "kudu/util/cache.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/slice.h"
 
-namespace kudu {
-class Cache;
-} // kudu
-
 /// This class is an implementation of an IO data cache which is backed by local storage.
 /// It implicitly relies on the OS page cache management to shuffle data between memory
 /// and the storage device. This is useful for caching data read from remote filesystems
@@ -192,7 +188,7 @@ class DataCache {
 
   /// An implementation of a cache partition. Each partition maintains its own set of
   /// cache keys in a LRU cache.
-  class Partition : public kudu::Cache::EvictionCallback {
+  class Partition : public Cache::EvictionCallback {
    public:
     /// Creates a partition at the given directory 'path' with quota 'capacity' in bytes.
     /// 'max_opened_files' is the maximum number of opened files allowed per partition.
@@ -298,7 +294,7 @@ class DataCache {
     ///
     /// A cache entry has type CacheEntry and it contains the metadata of the cached
     /// content. Please see comments at CachedEntry for details.
-    std::unique_ptr<kudu::Cache> meta_cache_;
+    std::unique_ptr<Cache> meta_cache_;
 
     std::unique_ptr<Tracer> tracer_;
 
@@ -325,7 +321,7 @@ class DataCache {
     /// work needs to be done. Returns false otherwise. In which case, the existing entry
     /// will be overwritten.
     bool HandleExistingEntry(const kudu::Slice& key,
-        const kudu::Cache::UniqueHandle& handle, const uint8_t* buffer,
+        const Cache::UniqueHandle& handle, const uint8_t* buffer,
         int64_t buffer_len);
 
     /// Helper function to insert a new entry with key 'key' into the LRU cache.
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index ba4e94f..21127cb 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+add_subdirectory(cache)
+
 set(SQUEASEL_SRC_DIR "${CMAKE_SOURCE_DIR}/be/src/thirdparty/squeasel")
 set(MUSTACHE_SRC_DIR "${CMAKE_SOURCE_DIR}/be/src/thirdparty/mustache")
 set(MPFIT_SRC_DIR "${CMAKE_SOURCE_DIR}/be/src/thirdparty/mpfit")
diff --git a/be/src/util/cache/CMakeLists.txt b/be/src/util/cache/CMakeLists.txt
index c6065e6..cb3595c 100644
--- a/be/src/util/cache/CMakeLists.txt
+++ b/be/src/util/cache/CMakeLists.txt
@@ -23,9 +23,15 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/util/cache")
 
 add_library(UtilCache
   cache.cc
-  nvm_cache.cc
 )
 add_dependencies(UtilCache gen-deps gen_ir_descriptions)
 
-# Currently does not build cache-test or cache-bench, because they depend on Kudu
-# build infrastructure.
+add_library(UtilCacheTests STATIC
+  cache-test.cc
+)
+add_dependencies(UtilCacheTests gen-deps gen_ir_descriptions)
+
+add_executable(cache-bench cache-bench.cc)
+target_link_libraries(cache-bench ${IMPALA_TEST_LINK_LIBS})
+
+ADD_UNIFIED_BE_LSAN_TEST(cache-test "CacheTypes/CacheTest.*:CacheTypes/LRUCacheTest.*:FIFOCacheTest.*")
diff --git a/be/src/util/cache/cache-bench.cc b/be/src/util/cache/cache-bench.cc
index 91026ae..ec26484 100644
--- a/be/src/util/cache/cache-bench.cc
+++ b/be/src/util/cache/cache-bench.cc
@@ -33,12 +33,12 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/human_readable.h"
-#include "kudu/util/cache.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/slice.h"
-#include "kudu/util/test_util.h"
+#include "util/cache/cache.h"
+#include "testutil/gtest-util.h"
 
 DEFINE_int32(num_threads, 16, "The number of threads to access the cache concurrently.");
 DEFINE_int32(run_seconds, 1, "The number of seconds to run the benchmark");
@@ -50,7 +50,7 @@ using std::thread;
 using std::unique_ptr;
 using std::vector;
 
-namespace kudu {
+namespace impala {
 
 // Benchmark a 1GB cache.
 static constexpr int kCacheCapacity = 1024 * 1024 * 1024;
@@ -90,11 +90,10 @@ struct BenchSetup {
   }
 };
 
-class CacheBench : public KuduTest,
+class CacheBench : public testing::Test,
                    public testing::WithParamInterface<BenchSetup>{
  public:
   void SetUp() override {
-    KuduTest::SetUp();
     cache_.reset(NewCache(kCacheCapacity, "test-cache"));
   }
 
@@ -102,15 +101,18 @@ class CacheBench : public KuduTest,
   // Returns a pair of the number of cache hits and lookups.
   pair<int64_t, int64_t> DoQueries(const atomic<bool>* done) {
     const BenchSetup& setup = GetParam();
-    Random r(GetRandomSeed32());
+    kudu::Random r(kudu::GetRandomSeed32());
     int64_t lookups = 0;
     int64_t hits = 0;
+    // Add max_key variable and test to avoid division by zero warning from clang-tidy
+    uint32_t max_key = setup.max_key();
+    if (max_key == 0) return {0, 0};
     while (!*done) {
       uint32_t int_key;
       if (setup.pattern == BenchSetup::Pattern::ZIPFIAN) {
-        int_key = r.Skewed(Bits::Log2Floor(setup.max_key()));
+        int_key = r.Skewed(Bits::Log2Floor(max_key));
       } else {
-        int_key = r.Uniform(setup.max_key());
+        int_key = r.Uniform(max_key);
       }
       char key_buf[sizeof(int_key)];
       memcpy(key_buf, &int_key, sizeof(int_key));
@@ -142,7 +144,7 @@ class CacheBench : public KuduTest,
           total_lookups += hits_lookups.second;
         });
     }
-    SleepFor(MonoDelta::FromSeconds(n_seconds));
+    kudu::SleepFor(kudu::MonoDelta::FromSeconds(n_seconds));
     done = true;
     for (auto& t : threads) {
       t.join();
@@ -184,4 +186,6 @@ TEST_P(CacheBench, RunBench) {
   LOG(INFO) << test_case << ": " << StringPrintf("%.1f", hit_rate * 100.0) << "% hit rate";
 }
 
-} // namespace kudu
+} // namespace impala
+
+IMPALA_TEST_MAIN();
diff --git a/be/src/util/cache/cache-test.cc b/be/src/util/cache/cache-test.cc
index 6ba34b2..265b0b9 100644
--- a/be/src/util/cache/cache-test.cc
+++ b/be/src/util/cache/cache-test.cc
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include "kudu/util/cache.h"
+#include "util/cache/cache.h"
 
 #include <cstring>
 #include <memory>
@@ -18,20 +18,14 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/block_cache_metrics.h"
-#include "kudu/util/cache_metrics.h"
 #include "kudu/util/coding.h"
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
-#include "kudu/util/nvm_cache.h"
 #include "kudu/util/slice.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
 
 DECLARE_bool(cache_force_single_shard);
-DECLARE_string(nvm_cache_path);
 
 DECLARE_double(cache_memtracker_approximation_ratio);
 
@@ -42,17 +36,17 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
-namespace kudu {
+namespace impala {
 
 // Conversions between numeric keys/values and the types expected by Cache.
 static std::string EncodeInt(int k) {
-  faststring result;
-  PutFixed32(&result, k);
+  kudu::faststring result;
+  kudu::PutFixed32(&result, k);
   return result.ToString();
 }
 static int DecodeInt(const Slice& k) {
   CHECK_EQ(4, k.size());
-  return DecodeFixed32(k.data());
+  return kudu::DecodeFixed32(k.data());
 }
 
 // Cache sharding policy affects the composition of the cache. Some test
@@ -62,7 +56,7 @@ enum class ShardingPolicy {
   SingleShard,
 };
 
-class CacheBaseTest : public KuduTest,
+class CacheBaseTest : public ::testing::Test,
                       public Cache::EvictionCallback {
  public:
   explicit CacheBaseTest(size_t cache_size)
@@ -110,11 +104,6 @@ class CacheBaseTest : public KuduTest,
     FLAGS_cache_force_single_shard =
         (sharding_policy == ShardingPolicy::SingleShard);
 
-    if (google::GetCommandLineFlagInfoOrDie("nvm_cache_path").is_default) {
-      FLAGS_nvm_cache_path = GetTestPath("nvm-cache");
-      ASSERT_OK(Env::Default()->CreateDir(FLAGS_nvm_cache_path));
-    }
-
     switch (eviction_policy) {
       case Cache::EvictionPolicy::FIFO:
         if (mem_type != Cache::MemoryType::DRAM) {
@@ -123,7 +112,7 @@ class CacheBaseTest : public KuduTest,
         cache_.reset(NewCache<Cache::EvictionPolicy::FIFO,
                               Cache::MemoryType::DRAM>(cache_size(),
                                                        "cache_test"));
-        MemTracker::FindTracker("cache_test-sharded_fifo_cache", &mem_tracker_);
+        kudu::MemTracker::FindTracker("cache_test-sharded_fifo_cache", &mem_tracker_);
         break;
       case Cache::EvictionPolicy::LRU:
         switch (mem_type) {
@@ -132,22 +121,13 @@ class CacheBaseTest : public KuduTest,
                                   Cache::MemoryType::DRAM>(cache_size(),
                                                            "cache_test"));
             break;
-          case Cache::MemoryType::NVM:
-            if (CanUseNVMCacheForTests()) {
-              cache_.reset(NewCache<Cache::EvictionPolicy::LRU,
-                                    Cache::MemoryType::NVM>(cache_size(),
-                                                            "cache_test"));
-            }
-            break;
           default:
             FAIL() << mem_type << ": unrecognized cache memory type";
-            break;
         }
-        MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_);
+        kudu::MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_);
         break;
       default:
         FAIL() << "unrecognized cache eviction policy";
-        break;
     }
 
     // Since nvm cache does not have memtracker due to the use of
@@ -155,23 +135,13 @@ class CacheBaseTest : public KuduTest,
     if (mem_type == Cache::MemoryType::DRAM) {
       ASSERT_TRUE(mem_tracker_.get());
     }
-
-    // cache_ will be null if we're trying to set up a test for the NVM cache
-    // and were unable to do so.
-    if (cache_) {
-      scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(
-          &metric_registry_, "test");
-      unique_ptr<BlockCacheMetrics> metrics(new BlockCacheMetrics(entity));
-      cache_->SetMetrics(std::move(metrics));
-    }
   }
 
   const size_t cache_size_;
   vector<int> evicted_keys_;
   vector<int> evicted_values_;
-  shared_ptr<MemTracker> mem_tracker_;
+  shared_ptr<kudu::MemTracker> mem_tracker_;
   unique_ptr<Cache> cache_;
-  MetricRegistry metric_registry_;
 };
 
 class CacheTest :
@@ -206,16 +176,9 @@ INSTANTIATE_TEST_CASE_P(
                    ShardingPolicy::MultiShard),
         make_tuple(Cache::MemoryType::DRAM,
                    Cache::EvictionPolicy::LRU,
-                   ShardingPolicy::SingleShard),
-        make_tuple(Cache::MemoryType::NVM,
-                   Cache::EvictionPolicy::LRU,
-                   ShardingPolicy::MultiShard),
-        make_tuple(Cache::MemoryType::NVM,
-                   Cache::EvictionPolicy::LRU,
                    ShardingPolicy::SingleShard)));
 
 TEST_P(CacheTest, TrackMemory) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   if (mem_tracker_) {
     Insert(100, 100, 1);
     ASSERT_EQ(1, mem_tracker_->consumption());
@@ -226,7 +189,6 @@ TEST_P(CacheTest, TrackMemory) {
 }
 
 TEST_P(CacheTest, HitAndMiss) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   ASSERT_EQ(-1, Lookup(100));
 
   Insert(100, 101);
@@ -250,7 +212,6 @@ TEST_P(CacheTest, HitAndMiss) {
 }
 
 TEST_P(CacheTest, Erase) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   Erase(200);
   ASSERT_EQ(0, evicted_keys_.size());
 
@@ -270,7 +231,6 @@ TEST_P(CacheTest, Erase) {
 }
 
 TEST_P(CacheTest, EntriesArePinned) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   Insert(100, 101);
   auto h1 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
   ASSERT_EQ(101, DecodeInt(cache_->Value(h1)));
@@ -299,7 +259,6 @@ TEST_P(CacheTest, EntriesArePinned) {
 // size of items still in the cache, which must be approximately the
 // same as the total capacity.
 TEST_P(CacheTest, HeavyEntries) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   const int kLight = cache_size() / 1000;
   const int kHeavy = cache_size() / 100;
   int added = 0;
@@ -324,7 +283,6 @@ TEST_P(CacheTest, HeavyEntries) {
 }
 
 TEST_P(CacheTest, InvalidateAllEntries) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   constexpr const int kEntriesNum = 1024;
   // This scenarios assumes no evictions are done at the cache capacity.
   ASSERT_LE(kEntriesNum, cache_size());
@@ -348,7 +306,6 @@ TEST_P(CacheTest, InvalidateAllEntries) {
 }
 
 TEST_P(CacheTest, InvalidateNoEntries) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   constexpr const int kEntriesNum = 10;
   // This scenarios assumes no evictions are done at the cache capacity.
   ASSERT_LE(kEntriesNum, cache_size());
@@ -370,7 +327,6 @@ TEST_P(CacheTest, InvalidateNoEntries) {
 }
 
 TEST_P(CacheTest, InvalidateNoEntriesNoAdvanceIterationFunctor) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   constexpr const int kEntriesNum = 256;
   // This scenarios assumes no evictions are done at the cache capacity.
   ASSERT_LE(kEntriesNum, cache_size());
@@ -398,7 +354,6 @@ TEST_P(CacheTest, InvalidateNoEntriesNoAdvanceIterationFunctor) {
 }
 
 TEST_P(CacheTest, InvalidateOddKeyEntries) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   constexpr const int kEntriesNum = 64;
   // This scenarios assumes no evictions are done at the cache capacity.
   ASSERT_LE(kEntriesNum, cache_size());
@@ -489,8 +444,7 @@ TEST_F(FIFOCacheTest, EvictionPolicy) {
 
 class LRUCacheTest :
     public CacheBaseTest,
-    public ::testing::WithParamInterface<tuple<Cache::MemoryType,
-                                               ShardingPolicy>> {
+    public ::testing::WithParamInterface<ShardingPolicy> {
  public:
   LRUCacheTest()
       : CacheBaseTest(16 * 1024 * 1024) {
@@ -498,21 +452,18 @@ class LRUCacheTest :
 
   void SetUp() override {
     const auto& param = GetParam();
-    SetupWithParameters(std::get<0>(param),
+    SetupWithParameters(Cache::MemoryType::DRAM,
                         Cache::EvictionPolicy::LRU,
-                        std::get<1>(param));
+                        param);
   }
 };
 
 INSTANTIATE_TEST_CASE_P(
     CacheTypes, LRUCacheTest,
-    ::testing::Combine(::testing::Values(Cache::MemoryType::DRAM,
-                                         Cache::MemoryType::NVM),
-                       ::testing::Values(ShardingPolicy::MultiShard,
-                                         ShardingPolicy::SingleShard)));
+    ::testing::Values(ShardingPolicy::MultiShard,
+                      ShardingPolicy::SingleShard));
 
 TEST_P(LRUCacheTest, EvictionPolicy) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   static constexpr int kNumElems = 1000;
   const int size_per_elem = cache_size() / kNumElems;
 
@@ -532,4 +483,5 @@ TEST_P(LRUCacheTest, EvictionPolicy) {
   ASSERT_EQ(-1, Lookup(200));
 }
 
-}  // namespace kudu
+}  // namespace impala
+
diff --git a/be/src/util/cache/cache.cc b/be/src/util/cache/cache.cc
index 073fa51..75e6ff6 100644
--- a/be/src/util/cache/cache.cc
+++ b/be/src/util/cache/cache.cc
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include "kudu/util/cache.h"
+#include "util/cache/cache.h"
 
 #include <atomic>
 #include <cstdint>
@@ -26,24 +26,16 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/sysinfo.h"
 #include "kudu/util/alignment.h"
-#include "kudu/util/cache_metrics.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/mem_tracker.h"
-#include "kudu/util/metrics.h"
 #include "kudu/util/slice.h"
-#include "kudu/util/test_util_prod.h"
 
 // Useful in tests that require accurate cache capacity accounting.
-DEFINE_bool(cache_force_single_shard, false,
-            "Override all cache implementations to use just one shard");
-TAG_FLAG(cache_force_single_shard, hidden);
+DECLARE_bool(cache_force_single_shard);
 
-DEFINE_double(cache_memtracker_approximation_ratio, 0.01,
-              "The MemTracker associated with a cache can accumulate error up to "
-              "this ratio to improve performance. For tests.");
-TAG_FLAG(cache_memtracker_approximation_ratio, hidden);
+DECLARE_double(cache_memtracker_approximation_ratio);
 
 using std::atomic;
 using std::shared_ptr;
@@ -51,7 +43,7 @@ using std::string;
 using std::unique_ptr;
 using std::vector;
 
-namespace kudu {
+namespace impala {
 
 Cache::~Cache() {
 }
@@ -200,7 +192,6 @@ string ToString(Cache::EvictionPolicy p) {
       return "lru";
     default:
       LOG(FATAL) << "unexpected cache eviction policy: " << static_cast<int>(p);
-      break;
   }
   return "unknown";
 }
@@ -209,7 +200,7 @@ string ToString(Cache::EvictionPolicy p) {
 template<Cache::EvictionPolicy policy>
 class CacheShard {
  public:
-  explicit CacheShard(MemTracker* tracker);
+  explicit CacheShard(kudu::MemTracker* tracker);
   ~CacheShard();
 
   // Separate from constructor so caller can easily make an array of CacheShard
@@ -218,8 +209,6 @@ class CacheShard {
     max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio;
   }
 
-  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
-
   Cache::Handle* Insert(RLHandle* handle, Cache::EvictionCallback* eviction_callback);
   // Like Cache::Lookup, but with an extra "hash" parameter.
   Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
@@ -259,14 +248,11 @@ class CacheShard {
   // Positive delta indicates an increased memory consumption.
   void UpdateMemTracker(int64_t delta);
 
-  // Update the metrics for a lookup operation in the cache.
-  void UpdateMetricsLookup(bool was_hit, bool caching);
-
   // Initialized before use.
   size_t capacity_;
 
   // mutex_ protects the following state.
-  simple_spinlock mutex_;
+  kudu::simple_spinlock mutex_;
   size_t usage_;
 
   // Dummy head of recency list.
@@ -275,21 +261,18 @@ class CacheShard {
 
   HandleTable table_;
 
-  MemTracker* mem_tracker_;
+  kudu::MemTracker* mem_tracker_;
   atomic<int64_t> deferred_consumption_ { 0 };
 
   // Initialized based on capacity_ to ensure an upper bound on the error on the
   // MemTracker consumption.
   int64_t max_deferred_consumption_;
-
-  CacheMetrics* metrics_;
 };
 
 template<Cache::EvictionPolicy policy>
-CacheShard<policy>::CacheShard(MemTracker* tracker)
+CacheShard<policy>::CacheShard(kudu::MemTracker* tracker)
     : usage_(0),
-      mem_tracker_(tracker),
-      metrics_(nullptr) {
+      mem_tracker_(tracker) {
   // Make empty circular linked list.
   rl_.next = &rl_;
   rl_.prev = &rl_;
@@ -322,10 +305,6 @@ void CacheShard<policy>::FreeEntry(RLHandle* e) {
     e->eviction_callback->EvictedEntry(e->key(), e->value());
   }
   UpdateMemTracker(-static_cast<int64_t>(e->charge));
-  if (PREDICT_TRUE(metrics_)) {
-    metrics_->cache_usage->DecrementBy(e->charge);
-    metrics_->evictions->Increment();
-  }
   delete [] e;
 }
 
@@ -342,26 +321,6 @@ void CacheShard<policy>::UpdateMemTracker(int64_t delta) {
 }
 
 template<Cache::EvictionPolicy policy>
-void CacheShard<policy>::UpdateMetricsLookup(bool was_hit, bool caching) {
-  if (PREDICT_TRUE(metrics_)) {
-    metrics_->lookups->Increment();
-    if (was_hit) {
-      if (caching) {
-        metrics_->cache_hits_caching->Increment();
-      } else {
-        metrics_->cache_hits->Increment();
-      }
-    } else {
-      if (caching) {
-        metrics_->cache_misses_caching->Increment();
-      } else {
-        metrics_->cache_misses->Increment();
-      }
-    }
-  }
-}
-
-template<Cache::EvictionPolicy policy>
 void CacheShard<policy>::RL_Remove(RLHandle* e) {
   e->next->prev = e->prev;
   e->prev->next = e->next;
@@ -403,9 +362,6 @@ Cache::Handle* CacheShard<policy>::Lookup(const Slice& key,
     }
   }
 
-  // Do the metrics outside of the lock.
-  UpdateMetricsLookup(e != nullptr, caching);
-
   return reinterpret_cast<Cache::Handle*>(e);
 }
 
@@ -428,10 +384,6 @@ Cache::Handle* CacheShard<policy>::Insert(
   // Two refs for the handle: one from CacheShard, one for the returned handle.
   handle->refs.store(2, std::memory_order_relaxed);
   UpdateMemTracker(handle->charge);
-  if (PREDICT_TRUE(metrics_)) {
-    metrics_->cache_usage->IncrementBy(handle->charge);
-    metrics_->inserts->Increment();
-  }
 
   RLHandle* to_remove_head = nullptr;
   {
@@ -550,7 +502,7 @@ class ShardedCache : public Cache {
     // A cache is often a singleton, so:
     // 1. We reuse its MemTracker if one already exists, and
     // 2. It is directly parented to the root MemTracker.
-    mem_tracker_ = MemTracker::FindOrCreateGlobalTracker(
+    mem_tracker_ = kudu::MemTracker::FindOrCreateGlobalTracker(
         -1, strings::Substitute("$0-sharded_$1_cache", id, ToString(policy)));
 
     int num_shards = 1 << shard_bits_;
@@ -567,22 +519,6 @@ class ShardedCache : public Cache {
     STLDeleteElements(&shards_);
   }
 
-  void SetMetrics(std::unique_ptr<CacheMetrics> metrics) override {
-    // TODO(KUDU-2165): reuse of the Cache singleton across multiple MiniCluster servers
-    // causes TSAN errors. So, we'll ensure that metrics only get attached once, from
-    // whichever server starts first. This has the downside that, in test builds, we won't
-    // get accurate cache metrics, but that's probably better than spurious failures.
-    std::lock_guard<decltype(metrics_lock_)> l(metrics_lock_);
-    if (metrics_) {
-      CHECK(IsGTest()) << "Metrics should only be set once per Cache singleton";
-      return;
-    }
-    metrics_ = std::move(metrics);
-    for (auto* cache : shards_) {
-      cache->SetMetrics(metrics_.get());
-    }
-  }
-
   UniqueHandle Lookup(const Slice& key, CacheBehavior caching) override {
     const uint32_t hash = HashSlice(key);
     return UniqueHandle(
@@ -624,7 +560,7 @@ class ShardedCache : public Cache {
     // TODO(KUDU-1091): account for the footprint of structures used by Cache's
     //                  internal housekeeping (RL handles, etc.) in case of
     //                  non-automatic charge.
-    handle->charge = (charge == kAutomaticCharge) ? kudu_malloc_usable_size(h.get())
+    handle->charge = (charge == kAutomaticCharge) ? kudu::kudu_malloc_usable_size(h.get())
                                                   : charge;
     handle->hash = HashSlice(key);
     memcpy(handle->kv_data, key.data(), key_len);
@@ -667,16 +603,11 @@ class ShardedCache : public Cache {
     return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
   }
 
-  shared_ptr<MemTracker> mem_tracker_;
-  unique_ptr<CacheMetrics> metrics_;
+  shared_ptr<kudu::MemTracker> mem_tracker_;
   vector<CacheShard<policy>*> shards_;
 
   // Number of bits of hash used to determine the shard.
   const int shard_bits_;
-
-  // Protects 'metrics_'. Used only when metrics are set, to ensure
-  // that they are set only once in test environments.
-  simple_spinlock metrics_lock_;
 };
 
 }  // end anonymous namespace
@@ -698,9 +629,6 @@ std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type) {
     case Cache::MemoryType::DRAM:
       os << "DRAM";
       break;
-    case Cache::MemoryType::NVM:
-      os << "NVM";
-      break;
     default:
       os << "unknown (" << static_cast<int>(mem_type) << ")";
       break;
@@ -708,4 +636,4 @@ std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type) {
   return os;
 }
 
-}  // namespace kudu
+}  // namespace impala
diff --git a/be/src/util/cache/cache.h b/be/src/util/cache/cache.h
index ff3a309..a64fe19 100644
--- a/be/src/util/cache/cache.h
+++ b/be/src/util/cache/cache.h
@@ -28,16 +28,15 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/util/slice.h"
 
-namespace kudu {
+using kudu::Slice;
 
-struct CacheMetrics;
+namespace impala {
 
 class Cache {
  public:
   // Type of memory backing the cache's storage.
   enum class MemoryType {
-    DRAM,
-    NVM,
+    DRAM
   };
 
   // Supported eviction policies for the cache. Eviction policy determines what
@@ -65,9 +64,6 @@ class Cache {
   // function that was passed to the constructor.
   virtual ~Cache();
 
-  // Set the cache metrics to update corresponding counters accordingly.
-  virtual void SetMetrics(std::unique_ptr<CacheMetrics> metrics) = 0;
-
   // Opaque handle to an entry stored in the cache.
   struct Handle { };
 
@@ -360,4 +356,4 @@ Cache* NewCache<Cache::EvictionPolicy::LRU,
 // A helper method to output cache memory type into ostream.
 std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type);
 
-} // namespace kudu
+} // namespace impala
diff --git a/be/src/util/cache/nvm_cache.cc b/be/src/util/cache/nvm_cache.cc
deleted file mode 100644
index d7860ac..0000000
--- a/be/src/util/cache/nvm_cache.cc
+++ /dev/null
@@ -1,719 +0,0 @@
-// This file is derived from cache.cc in the LevelDB project:
-//
-//   Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
-//   Use of this source code is governed by a BSD-style license that can be
-//   found in the LICENSE file.
-//
-// ------------------------------------------------------------
-// This file implements a cache based on the MEMKIND library (http://memkind.github.io/memkind/)
-// This library makes it easy to program against persistent memory hardware by exposing an API
-// which parallels malloc/free, but allocates from persistent memory instead of DRAM.
-//
-// We use this API to implement a cache which treats persistent memory or
-// non-volatile memory as if it were a larger cheaper bank of volatile memory. We
-// currently make no use of its persistence properties.
-//
-// Currently, we only store key/value in NVM. All other data structures such as the
-// ShardedLRUCache instances, hash table, etc are in DRAM. The assumption is that
-// the ratio of data stored vs overhead is quite high.
-
-#include "kudu/util/nvm_cache.h"
-
-#include <dlfcn.h>
-
-#include <cstdint>
-#include <cstring>
-#include <iostream>
-#include <memory>
-#include <mutex>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
-#include <glog/logging.h>
-
-#include "kudu/gutil/atomic_refcount.h"
-#include "kudu/gutil/atomicops.h"
-#include "kudu/gutil/bits.h"
-#include "kudu/gutil/dynamic_annotations.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/hash/city.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/sysinfo.h"
-#include "kudu/util/cache.h"
-#include "kudu/util/cache_metrics.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/scoped_cleanup.h"
-#include "kudu/util/slice.h"
-#include "kudu/util/status.h"
-
-#ifndef MEMKIND_PMEM_MIN_SIZE
-#define MEMKIND_PMEM_MIN_SIZE (1024 * 1024 * 16) // Taken from memkind 1.9.0.
-#endif
-
-struct memkind;
-
-// Useful in tests that require accurate cache capacity accounting.
-DECLARE_bool(cache_force_single_shard);
-
-DEFINE_string(nvm_cache_path, "/pmem",
-              "The path at which the NVM cache will try to allocate its memory. "
-              "This can be a tmpfs or ramfs for testing purposes.");
-
-DEFINE_bool(nvm_cache_simulate_allocation_failure, false,
-            "If true, the NVM cache will inject failures in calls to memkind_malloc "
-            "for testing.");
-TAG_FLAG(nvm_cache_simulate_allocation_failure, unsafe);
-
-using std::string;
-using std::unique_ptr;
-using std::vector;
-using strings::Substitute;
-
-namespace kudu {
-
-namespace {
-
-// Taken together, these typedefs and this macro make it easy to call a
-// memkind function:
-//
-//  CALL_MEMKIND(memkind_malloc, vmp_, size);
-typedef int (*memkind_create_pmem)(const char*, size_t, memkind**);
-typedef int (*memkind_destroy_kind)(memkind*);
-typedef void* (*memkind_malloc)(memkind*, size_t);
-typedef size_t (*memkind_malloc_usable_size)(memkind*, void*);
-typedef void (*memkind_free)(memkind*, void*);
-#define CALL_MEMKIND(func_name, ...) ((func_name)g_##func_name)(__VA_ARGS__)
-
-// Function pointers into memkind; set by InitMemkindOps().
-void* g_memkind_create_pmem;
-void* g_memkind_destroy_kind;
-void* g_memkind_malloc;
-void* g_memkind_malloc_usable_size;
-void* g_memkind_free;
-
-// After InitMemkindOps() is called, true if memkind is available and safe
-// to use, false otherwise.
-bool g_memkind_available;
-
-std::once_flag g_memkind_ops_flag;
-
-// Try to dlsym() a particular symbol from 'handle', storing the result in 'ptr'
-// if successful.
-Status TryDlsym(void* handle, const char* sym, void** ptr) {
-  dlerror(); // Need to clear any existing error first.
-  void* ret = dlsym(handle, sym);
-  char* error = dlerror();
-  if (error) {
-    return Status::NotSupported(Substitute("could not dlsym $0", sym), error);
-  }
-  *ptr = ret;
-  return Status::OK();
-}
-
-// Try to dlopen() memkind and set up all the function pointers we need from it.
-//
-// Note: in terms of protecting ourselves against changes in memkind, we'll
-// notice (and fail) if a symbol is missing, but not if it's signature has
-// changed or if there's some subtle behavioral change. A scan of the memkind
-// repo suggests that backwards compatibility is enforced: symbols are only
-// added and behavioral changes are effected via the introduction of new symbols.
-void InitMemkindOps() {
-  g_memkind_available = false;
-
-  // Use RTLD_NOW so that if any of memkind's dependencies aren't satisfied
-  // (e.g. libnuma is too old and is missing symbols), we'll know up front
-  // instead of during cache operations.
-  void* memkind_lib = dlopen("libmemkind.so.0", RTLD_NOW);
-  if (!memkind_lib) {
-    LOG(WARNING) << "could not dlopen: " << dlerror();
-    return;
-  }
-  auto cleanup = MakeScopedCleanup([&]() {
-    dlclose(memkind_lib);
-  });
-
-#define DLSYM_OR_RETURN(func_name, handle) do { \
-    const Status _s = TryDlsym(memkind_lib, func_name, handle); \
-    if (!_s.ok()) { \
-      LOG(WARNING) << _s.ToString(); \
-      return; \
-    } \
-  } while (0)
-
-  DLSYM_OR_RETURN("memkind_create_pmem", &g_memkind_create_pmem);
-  DLSYM_OR_RETURN("memkind_destroy_kind", &g_memkind_destroy_kind);
-  DLSYM_OR_RETURN("memkind_malloc", &g_memkind_malloc);
-  DLSYM_OR_RETURN("memkind_malloc_usable_size", &g_memkind_malloc_usable_size);
-  DLSYM_OR_RETURN("memkind_free", &g_memkind_free);
-#undef DLSYM_OR_RETURN
-
-  g_memkind_available = true;
-
-  // Need to keep the memkind library handle open so our function pointers
-  // remain loaded in memory.
-  cleanup.cancel();
-}
-
-typedef simple_spinlock MutexType;
-
-// LRU cache implementation
-
-// An entry is a variable length heap-allocated structure.  Entries
-// are kept in a circular doubly linked list ordered by access time.
-struct LRUHandle {
-  Cache::EvictionCallback* eviction_callback;
-  LRUHandle* next_hash;
-  LRUHandle* next;
-  LRUHandle* prev;
-  size_t charge;      // TODO(opt): Only allow uint32_t?
-  uint32_t key_length;
-  uint32_t val_length;
-  Atomic32 refs;
-  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
-  uint8_t* kv_data;
-
-  Slice key() const {
-    return Slice(kv_data, key_length);
-  }
-
-  Slice value() const {
-    return Slice(&kv_data[key_length], val_length);
-  }
-
-  uint8_t* val_ptr() {
-    return &kv_data[key_length];
-  }
-};
-
-// We provide our own simple hash table since it removes a whole bunch
-// of porting hacks and is also faster than some of the built-in hash
-// table implementations in some of the compiler/runtime combinations
-// we have tested.  E.g., readrandom speeds up by ~5% over the g++
-// 4.4.3's builtin hashtable.
-class HandleTable {
- public:
-  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
-  ~HandleTable() { delete[] list_; }
-
-  LRUHandle* Lookup(const Slice& key, uint32_t hash) {
-    return *FindPointer(key, hash);
-  }
-
-  LRUHandle* Insert(LRUHandle* h) {
-    LRUHandle** ptr = FindPointer(h->key(), h->hash);
-    LRUHandle* old = *ptr;
-    h->next_hash = (old == nullptr ? nullptr : old->next_hash);
-    *ptr = h;
-    if (old == nullptr) {
-      ++elems_;
-      if (elems_ > length_) {
-        // Since each cache entry is fairly large, we aim for a small
-        // average linked list length (<= 1).
-        Resize();
-      }
-    }
-    return old;
-  }
-
-  LRUHandle* Remove(const Slice& key, uint32_t hash) {
-    LRUHandle** ptr = FindPointer(key, hash);
-    LRUHandle* result = *ptr;
-    if (result != nullptr) {
-      *ptr = result->next_hash;
-      --elems_;
-    }
-    return result;
-  }
-
- private:
-  // The table consists of an array of buckets where each bucket is
-  // a linked list of cache entries that hash into the bucket.
-  uint32_t length_;
-  uint32_t elems_;
-  LRUHandle** list_;
-
-  // Return a pointer to slot that points to a cache entry that
-  // matches key/hash.  If there is no such cache entry, return a
-  // pointer to the trailing slot in the corresponding linked list.
-  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
-    LRUHandle** ptr = &list_[hash & (length_ - 1)];
-    while (*ptr != nullptr &&
-           ((*ptr)->hash != hash || key != (*ptr)->key())) {
-      ptr = &(*ptr)->next_hash;
-    }
-    return ptr;
-  }
-
-  void Resize() {
-    uint32_t new_length = 16;
-    while (new_length < elems_ * 1.5) {
-      new_length *= 2;
-    }
-    LRUHandle** new_list = new LRUHandle*[new_length];
-    memset(new_list, 0, sizeof(new_list[0]) * new_length);
-    uint32_t count = 0;
-    for (uint32_t i = 0; i < length_; i++) {
-      LRUHandle* h = list_[i];
-      while (h != nullptr) {
-        LRUHandle* next = h->next_hash;
-        uint32_t hash = h->hash;
-        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
-        h->next_hash = *ptr;
-        *ptr = h;
-        h = next;
-        count++;
-      }
-    }
-    DCHECK_EQ(elems_, count);
-    delete[] list_;
-    list_ = new_list;
-    length_ = new_length;
-  }
-};
-
-// A single shard of sharded cache.
-class NvmLRUCache {
- public:
-  explicit NvmLRUCache(memkind *vmp);
-  ~NvmLRUCache();
-
-  // Separate from constructor so caller can easily make an array of LRUCache
-  void SetCapacity(size_t capacity) { capacity_ = capacity; }
-
-  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
-
-  Cache::Handle* Insert(LRUHandle* e, Cache::EvictionCallback* eviction_callback);
-
-  // Like Cache::Lookup, but with an extra "hash" parameter.
-  Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
-  void Release(Cache::Handle* handle);
-  void Erase(const Slice& key, uint32_t hash);
-  size_t Invalidate(const Cache::InvalidationControl& ctl);
-  void* Allocate(size_t size);
-
- private:
-  void NvmLRU_Remove(LRUHandle* e);
-  void NvmLRU_Append(LRUHandle* e);
-  // Just reduce the reference count by 1.
-  // Return true if last reference
-  bool Unref(LRUHandle* e);
-  void FreeEntry(LRUHandle* e);
-
-  // Evict the LRU item in the cache, adding it to the linked list
-  // pointed to by 'to_remove_head'.
-  void EvictOldestUnlocked(LRUHandle** to_remove_head);
-
-  // Free all of the entries in the linked list that has to_free_head
-  // as its head.
-  void FreeLRUEntries(LRUHandle* to_free_head);
-
-  // Wrapper around memkind_malloc which injects failures based on a flag.
-  void* MemkindMalloc(size_t size);
-
-  // Initialized before use.
-  size_t capacity_;
-
-  // mutex_ protects the following state.
-  MutexType mutex_;
-  size_t usage_;
-
-  // Dummy head of LRU list.
-  // lru.prev is newest entry, lru.next is oldest entry.
-  LRUHandle lru_;
-
-  HandleTable table_;
-
-  memkind* vmp_;
-
-  CacheMetrics* metrics_;
-};
-
-NvmLRUCache::NvmLRUCache(memkind* vmp)
-  : usage_(0),
-  vmp_(vmp),
-  metrics_(nullptr) {
-  // Make empty circular linked list
-  lru_.next = &lru_;
-  lru_.prev = &lru_;
-}
-
-NvmLRUCache::~NvmLRUCache() {
-  for (LRUHandle* e = lru_.next; e != &lru_; ) {
-    LRUHandle* next = e->next;
-    DCHECK_EQ(e->refs, 1);  // Error if caller has an unreleased handle
-    if (Unref(e)) {
-      FreeEntry(e);
-    }
-    e = next;
-  }
-}
-
-void* NvmLRUCache::MemkindMalloc(size_t size) {
-  if (PREDICT_FALSE(FLAGS_nvm_cache_simulate_allocation_failure)) {
-    return nullptr;
-  }
-  return CALL_MEMKIND(memkind_malloc, vmp_, size);
-}
-
-bool NvmLRUCache::Unref(LRUHandle* e) {
-  DCHECK_GT(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
-  return !base::RefCountDec(&e->refs);
-}
-
-void NvmLRUCache::FreeEntry(LRUHandle* e) {
-  DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
-  if (e->eviction_callback) {
-    e->eviction_callback->EvictedEntry(e->key(), e->value());
-  }
-  if (PREDICT_TRUE(metrics_)) {
-    metrics_->cache_usage->DecrementBy(e->charge);
-    metrics_->evictions->Increment();
-  }
-  CALL_MEMKIND(memkind_free, vmp_, e);
-}
-
-// Allocate NVM memory.
-void* NvmLRUCache::Allocate(size_t size) {
-  return MemkindMalloc(size);
-}
-
-void NvmLRUCache::NvmLRU_Remove(LRUHandle* e) {
-  e->next->prev = e->prev;
-  e->prev->next = e->next;
-  usage_ -= e->charge;
-}
-
-void NvmLRUCache::NvmLRU_Append(LRUHandle* e) {
-  // Make "e" newest entry by inserting just before lru_
-  e->next = &lru_;
-  e->prev = lru_.prev;
-  e->prev->next = e;
-  e->next->prev = e;
-  usage_ += e->charge;
-}
-
-Cache::Handle* NvmLRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) {
- LRUHandle* e;
-  {
-    std::lock_guard<MutexType> l(mutex_);
-    e = table_.Lookup(key, hash);
-    if (e != nullptr) {
-      // If an entry exists, remove the old entry from the cache
-      // and re-add to the end of the linked list.
-      base::RefCountInc(&e->refs);
-      NvmLRU_Remove(e);
-      NvmLRU_Append(e);
-    }
-  }
-
-  // Do the metrics outside of the lock.
-  if (metrics_) {
-    metrics_->lookups->Increment();
-    bool was_hit = (e != nullptr);
-    if (was_hit) {
-      if (caching) {
-        metrics_->cache_hits_caching->Increment();
-      } else {
-        metrics_->cache_hits->Increment();
-      }
-    } else {
-      if (caching) {
-        metrics_->cache_misses_caching->Increment();
-      } else {
-        metrics_->cache_misses->Increment();
-      }
-    }
-  }
-
-  return reinterpret_cast<Cache::Handle*>(e);
-}
-
-void NvmLRUCache::Release(Cache::Handle* handle) {
-  LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
-  bool last_reference = Unref(e);
-  if (last_reference) {
-    FreeEntry(e);
-  }
-}
-
-void NvmLRUCache::EvictOldestUnlocked(LRUHandle** to_remove_head) {
-  LRUHandle* old = lru_.next;
-  NvmLRU_Remove(old);
-  table_.Remove(old->key(), old->hash);
-  if (Unref(old)) {
-    old->next = *to_remove_head;
-    *to_remove_head = old;
-  }
-}
-
-void NvmLRUCache::FreeLRUEntries(LRUHandle* to_free_head) {
-  while (to_free_head != nullptr) {
-    LRUHandle* next = to_free_head->next;
-    FreeEntry(to_free_head);
-    to_free_head = next;
-  }
-}
-
-Cache::Handle* NvmLRUCache::Insert(LRUHandle* e,
-                                   Cache::EvictionCallback* eviction_callback) {
-  DCHECK(e);
-  LRUHandle* to_remove_head = nullptr;
-
-  e->refs = 2;  // One from LRUCache, one for the returned handle
-  e->eviction_callback = eviction_callback;
-  if (PREDICT_TRUE(metrics_)) {
-    metrics_->cache_usage->IncrementBy(e->charge);
-    metrics_->inserts->Increment();
-  }
-
-  {
-    std::lock_guard<MutexType> l(mutex_);
-
-    NvmLRU_Append(e);
-
-    LRUHandle* old = table_.Insert(e);
-    if (old != nullptr) {
-      NvmLRU_Remove(old);
-      if (Unref(old)) {
-        old->next = to_remove_head;
-        to_remove_head = old;
-      }
-    }
-
-    while (usage_ > capacity_ && lru_.next != &lru_) {
-      EvictOldestUnlocked(&to_remove_head);
-    }
-  }
-
-  // we free the entries here outside of mutex for
-  // performance reasons
-  FreeLRUEntries(to_remove_head);
-
-  return reinterpret_cast<Cache::Handle*>(e);
-}
-
-void NvmLRUCache::Erase(const Slice& key, uint32_t hash) {
-  LRUHandle* e;
-  bool last_reference = false;
-  {
-    std::lock_guard<MutexType> l(mutex_);
-    e = table_.Remove(key, hash);
-    if (e != nullptr) {
-      NvmLRU_Remove(e);
-      last_reference = Unref(e);
-    }
-  }
-  // mutex not held here
-  // last_reference will only be true if e != nullptr
-  if (last_reference) {
-    FreeEntry(e);
-  }
-}
-
-size_t NvmLRUCache::Invalidate(const Cache::InvalidationControl& ctl) {
-  size_t invalid_entry_count = 0;
-  size_t valid_entry_count = 0;
-  LRUHandle* to_remove_head = nullptr;
-
-  {
-    std::lock_guard<MutexType> l(mutex_);
-
-    // rl_.next is the oldest entry in the recency list.
-    LRUHandle* h = lru_.next;
-    while (h != nullptr && h != &lru_ &&
-           ctl.iteration_func(valid_entry_count, invalid_entry_count)) {
-      if (ctl.validity_func(h->key(), h->value())) {
-        // Continue iterating over the list.
-        h = h->next;
-        ++valid_entry_count;
-        continue;
-      }
-      // Copy the handle slated for removal.
-      LRUHandle* h_to_remove = h;
-      // Prepare for next iteration of the cycle.
-      h = h->next;
-
-      NvmLRU_Remove(h_to_remove);
-      table_.Remove(h_to_remove->key(), h_to_remove->hash);
-      if (Unref(h_to_remove)) {
-        h_to_remove->next = to_remove_head;
-        to_remove_head = h_to_remove;
-      }
-      ++invalid_entry_count;
-    }
-  }
-
-  FreeLRUEntries(to_remove_head);
-
-  return invalid_entry_count;
-}
-
-// Determine the number of bits of the hash that should be used to determine
-// the cache shard. This, in turn, determines the number of shards.
-int DetermineShardBits() {
-  int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
-      0 : Bits::Log2Ceiling(base::NumCPUs());
-  VLOG(1) << "Will use " << (1 << bits) << " shards for LRU cache.";
-  return bits;
-}
-
-class ShardedLRUCache : public Cache {
- private:
-  unique_ptr<CacheMetrics> metrics_;
-  vector<NvmLRUCache*> shards_;
-
-  // Number of bits of hash used to determine the shard.
-  const int shard_bits_;
-
-  memkind* vmp_;
-
-  static inline uint32_t HashSlice(const Slice& s) {
-    return util_hash::CityHash64(
-      reinterpret_cast<const char *>(s.data()), s.size());
-  }
-
-  uint32_t Shard(uint32_t hash) {
-    // Widen to uint64 before shifting, or else on a single CPU,
-    // we would try to shift a uint32_t by 32 bits, which is undefined.
-    return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
-  }
-
- public:
-  explicit ShardedLRUCache(size_t capacity, const string& /*id*/, memkind* vmp)
-      : shard_bits_(DetermineShardBits()),
-        vmp_(vmp) {
-    int num_shards = 1 << shard_bits_;
-    const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
-    for (int s = 0; s < num_shards; s++) {
-      gscoped_ptr<NvmLRUCache> shard(new NvmLRUCache(vmp_));
-      shard->SetCapacity(per_shard);
-      shards_.push_back(shard.release());
-    }
-  }
-
-  virtual ~ShardedLRUCache() {
-    STLDeleteElements(&shards_);
-    // Per the note at the top of this file, our cache is entirely volatile.
-    // Hence, when the cache is destructed, we delete the underlying
-    // memkind pool.
-    CALL_MEMKIND(memkind_destroy_kind, vmp_);
-  }
-
-  virtual UniqueHandle Insert(UniquePendingHandle handle,
-                              Cache::EvictionCallback* eviction_callback) OVERRIDE {
-    LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle.release()));
-    return UniqueHandle(
-        shards_[Shard(h->hash)]->Insert(h, eviction_callback),
-        Cache::HandleDeleter(this));
-  }
-  virtual UniqueHandle Lookup(const Slice& key, CacheBehavior caching) OVERRIDE {
-    const uint32_t hash = HashSlice(key);
-    return UniqueHandle(
-        shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE),
-        Cache::HandleDeleter(this));
-  }
-  virtual void Release(Handle* handle) OVERRIDE {
-    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
-    shards_[Shard(h->hash)]->Release(handle);
-  }
-  virtual void Erase(const Slice& key) OVERRIDE {
-    const uint32_t hash = HashSlice(key);
-    shards_[Shard(hash)]->Erase(key, hash);
-  }
-  virtual Slice Value(const UniqueHandle& handle) const OVERRIDE {
-    return reinterpret_cast<const LRUHandle*>(handle.get())->value();
-  }
-  virtual uint8_t* MutableValue(UniquePendingHandle* handle) OVERRIDE {
-    return reinterpret_cast<LRUHandle*>(handle->get())->val_ptr();
-  }
-
-  virtual void SetMetrics(unique_ptr<CacheMetrics> metrics) OVERRIDE {
-    metrics_ = std::move(metrics);
-    for (NvmLRUCache* cache : shards_) {
-      cache->SetMetrics(metrics_.get());
-    }
-  }
-  virtual UniquePendingHandle Allocate(Slice key, int val_len, int charge) OVERRIDE {
-    int key_len = key.size();
-    DCHECK_GE(key_len, 0);
-    DCHECK_GE(val_len, 0);
-
-    // Try allocating from each of the shards -- if memkind is tight,
-    // this can cause eviction, so we might have better luck in different
-    // shards.
-    for (NvmLRUCache* cache : shards_) {
-      UniquePendingHandle ph(static_cast<PendingHandle*>(
-          cache->Allocate(sizeof(LRUHandle) + key_len + val_len)),
-          Cache::PendingHandleDeleter(this));
-      if (ph) {
-        LRUHandle* handle = reinterpret_cast<LRUHandle*>(ph.get());
-        uint8_t* buf = reinterpret_cast<uint8_t*>(ph.get());
-        handle->kv_data = &buf[sizeof(LRUHandle)];
-        handle->val_length = val_len;
-        handle->key_length = key_len;
-        handle->charge = (charge == kAutomaticCharge) ?
-            CALL_MEMKIND(memkind_malloc_usable_size, vmp_, buf) : charge;
-        handle->hash = HashSlice(key);
-        memcpy(handle->kv_data, key.data(), key.size());
-        return ph;
-      }
-    }
-    // TODO(unknown): increment a metric here on allocation failure.
-    return UniquePendingHandle(nullptr, Cache::PendingHandleDeleter(this));
-  }
-
-  virtual void Free(PendingHandle* ph) OVERRIDE {
-    CALL_MEMKIND(memkind_free, vmp_, ph);
-  }
-
-  size_t Invalidate(const InvalidationControl& ctl) override {
-    size_t invalidated_count = 0;
-    for (auto& shard: shards_) {
-      invalidated_count += shard->Invalidate(ctl);
-    }
-    return invalidated_count;
-  }
-};
-
-} // end anonymous namespace
-
-template<>
-Cache* NewCache<Cache::EvictionPolicy::LRU,
-                Cache::MemoryType::NVM>(size_t capacity, const std::string& id) {
-  std::call_once(g_memkind_ops_flag, InitMemkindOps);
-
-  // TODO(adar): we should plumb the failure up the call stack, but at the time
-  // of writing the NVM cache is only usable by the block cache, and its use of
-  // the singleton pattern prevents the surfacing of errors.
-  CHECK(g_memkind_available) << "Memkind not available!";
-
-  // memkind_create_pmem() will fail if the capacity is too small, but with
-  // an inscrutable error. So, we'll check ourselves.
-  CHECK_GE(capacity, MEMKIND_PMEM_MIN_SIZE)
-    << "configured capacity " << capacity << " bytes is less than "
-    << "the minimum capacity for an NVM cache: " << MEMKIND_PMEM_MIN_SIZE;
-
-  memkind* vmp;
-  int err = CALL_MEMKIND(memkind_create_pmem, FLAGS_nvm_cache_path.c_str(), capacity, &vmp);
-  // If we cannot create the cache pool we should not retry.
-  PLOG_IF(FATAL, err) << "Could not initialize NVM cache library in path "
-                           << FLAGS_nvm_cache_path.c_str();
-
-  return new ShardedLRUCache(capacity, id, vmp);
-}
-
-bool CanUseNVMCacheForTests() {
-  std::call_once(g_memkind_ops_flag, InitMemkindOps);
-  return g_memkind_available;
-}
-
-} // namespace kudu
diff --git a/be/src/util/cache/nvm_cache.h b/be/src/util/cache/nvm_cache.h
deleted file mode 100644
index d6c2762..0000000
--- a/be/src/util/cache/nvm_cache.h
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-
-#include <cstddef>
-#include <string>
-
-#include "kudu/util/cache.h"
-
-namespace kudu {
-
-// Convenience macro for invoking CanUseNVMCacheForTests.
-#define RETURN_IF_NO_NVM_CACHE(memory_type) do { \
-  if ((memory_type) == Cache::MemoryType::NVM && !CanUseNVMCacheForTests()) { \
-    LOG(WARNING) << "test is skipped; NVM cache cannot be created"; \
-    return; \
-  } \
-} while (0)
-
-// Returns true if an NVM cache can be created on this machine; false otherwise.
-//
-// Only intended for use in unit tests.
-bool CanUseNVMCacheForTests();
-
-// Create a new LRU cache with a fixed size capacity. This implementation
-// of Cache uses the least-recently-used eviction policy and stored in NVM.
-template<>
-Cache* NewCache<Cache::EvictionPolicy::LRU,
-                Cache::MemoryType::NVM>(size_t capacity, const std::string& id);
-
-}  // namespace kudu
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 495fdcb..b992ad1 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -78,7 +78,6 @@ be/src/kudu/security/x509_check_host.h
 be/src/kudu/security/x509_check_host.cc
 be/src/util/cache/cache.h
 be/src/util/cache/cache.cc
-be/src/util/cache/nvm_cache.cc
 be/src/util/cache/cache-test.cc
 
 # http://www.apache.org/legal/src-headers.html: "Short informational text files; for