You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/02/20 23:54:13 UTC

[impala] branch master updated (e04a839 -> 091faeb)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from e04a839  IMPALA-9366: Remove embedded pointer reference in PhjBuilder::CodegenInsertRuntimeFilters
     new eead187  IMPALA-8690 (prep 1): Copy Kudu cache code to be/src/util/cache
     new 091faeb  IMPALA-8690 (prep 2): Switch DataCache to Impala's cache implementation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/CMakeLists.txt                                |  2 +
 be/src/runtime/io/data-cache.cc                  | 16 ++--
 be/src/runtime/io/data-cache.h                   | 12 +--
 be/src/util/CMakeLists.txt                       |  2 +
 be/src/{statestore => util/cache}/CMakeLists.txt | 24 +++---
 be/src/{kudu/util => util/cache}/cache-bench.cc  | 24 +++---
 be/src/{kudu/util => util/cache}/cache-test.cc   | 80 ++++---------------
 be/src/{kudu/util => util/cache}/cache.cc        | 98 ++++--------------------
 be/src/{kudu/util => util/cache}/cache.h         | 12 +--
 bin/rat_exclude_files.txt                        |  3 +
 10 files changed, 79 insertions(+), 194 deletions(-)
 copy be/src/{statestore => util/cache}/CMakeLists.txt (60%)
 copy be/src/{kudu/util => util/cache}/cache-bench.cc (91%)
 copy be/src/{kudu/util => util/cache}/cache-test.cc (83%)
 copy be/src/{kudu/util => util/cache}/cache.cc (86%)
 copy be/src/{kudu/util => util/cache}/cache.h (98%)


[impala] 02/02: IMPALA-8690 (prep 2): Switch DataCache to Impala's cache implementation

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 091faebe61645c9f86f59bc67668688f15a2fc7c
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Wed Feb 5 17:23:20 2020 -0800

    IMPALA-8690 (prep 2): Switch DataCache to Impala's cache implementation
    
    In a previous change, the Kudu caching code from be/src/kudu/util was
    copied to be/src/util/cache. This gets that code workable for Impala
    and uses it for the DataCache.
    
    This involves the following changes:
    1. Fix up includes to point to Impala's cache implementation
    2. Remove NVM Cache support and code (not used by Impala)
    3. Remove CacheMetrics code (not used by Impala)
    4. Move the cache code to the impala namespace
    5. Modify cache-bench and cache-test to use Impala's backend
       test infrastructure
    6. Switch DataCache to use be/src/util/cache
    
    These are only boilerplate changes. The cache implementation has not
    meaningfully changed.
    
    Testing:
     - Ran core tests
     - The modified version of Kudu's cache-test passes
     - The modified version of Kudu's cache-bench runs successfully
    
    Change-Id: I917f8352c9276373dd2761af986bf3487855271c
    Reviewed-on: http://gerrit.cloudera.org:8080/15170
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/CMakeLists.txt                |   2 +
 be/src/runtime/io/data-cache.cc  |  16 +-
 be/src/runtime/io/data-cache.h   |  12 +-
 be/src/util/CMakeLists.txt       |   2 +
 be/src/util/cache/CMakeLists.txt |  12 +-
 be/src/util/cache/cache-bench.cc |  24 +-
 be/src/util/cache/cache-test.cc  |  80 +----
 be/src/util/cache/cache.cc       |  98 +-----
 be/src/util/cache/cache.h        |  12 +-
 be/src/util/cache/nvm_cache.cc   | 719 ---------------------------------------
 be/src/util/cache/nvm_cache.h    |  45 ---
 bin/rat_exclude_files.txt        |   1 -
 12 files changed, 71 insertions(+), 952 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index c2d3ea6..42fa315 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -416,6 +416,7 @@ set (IMPALA_LIBS
   token_proto
   Udf
   Util
+  UtilCache
 )
 
 set (IMPALA_LINK_LIBS
@@ -454,6 +455,7 @@ set (UNIFIED_TEST_LIBS
   SchedulingTests
   ServiceTests
   UtilTests
+  UtilCacheTests
 )
 set (UNIFIED_TEST_LINK_LIBS
   ${WL_START_GROUP}
diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc
index 85dd277..4f74566 100644
--- a/be/src/runtime/io/data-cache.cc
+++ b/be/src/runtime/io/data-cache.cc
@@ -29,7 +29,6 @@
 
 #include "exec/kudu-util.h"
 #include "kudu/util/async_logger.h"
-#include "kudu/util/cache.h"
 #include "kudu/util/env.h"
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/locks.h"
@@ -40,6 +39,7 @@
 #include "gutil/strings/split.h"
 #include "gutil/walltime.h"
 #include "util/bit-util.h"
+#include "util/cache/cache.h"
 #include "util/error-util.h"
 #include "util/filesystem-util.h"
 #include "util/hash-util.h"
@@ -446,7 +446,7 @@ DataCache::Partition::Partition(
     capacity_(max<int64_t>(capacity, PAGE_SIZE)),
     max_opened_files_(max_opened_files),
     meta_cache_(
-        kudu::NewCache<kudu::Cache::EvictionPolicy::LRU, kudu::Cache::MemoryType::DRAM>(
+        NewCache<Cache::EvictionPolicy::LRU, Cache::MemoryType::DRAM>(
             capacity_, path_)) {}
 
 DataCache::Partition::~Partition() {
@@ -553,8 +553,7 @@ int64_t DataCache::Partition::Lookup(const CacheKey& cache_key, int64_t bytes_to
     uint8_t* buffer) {
   DCHECK(!closed_);
   Slice key = cache_key.ToSlice();
-  kudu::Cache::UniqueHandle handle(
-      meta_cache_->Lookup(key, kudu::Cache::EXPECT_IN_CACHE));
+  Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::EXPECT_IN_CACHE));
 
   if (handle.get() == nullptr) {
     if (tracer_ != nullptr) {
@@ -589,7 +588,7 @@ int64_t DataCache::Partition::Lookup(const CacheKey& cache_key, int64_t bytes_to
 }
 
 bool DataCache::Partition::HandleExistingEntry(const Slice& key,
-    const kudu::Cache::UniqueHandle& handle, const uint8_t* buffer, int64_t buffer_len) {
+    const Cache::UniqueHandle& handle, const uint8_t* buffer, int64_t buffer_len) {
   // Unpack the cache entry.
   CacheEntry entry(meta_cache_->Value(handle));
 
@@ -612,7 +611,7 @@ bool DataCache::Partition::InsertIntoCache(const Slice& key, CacheFile* cache_fi
   const int64_t charge_len = BitUtil::RoundUp(buffer_len, PAGE_SIZE);
 
   // Allocate a cache handle
-  kudu::Cache::UniquePendingHandle pending_handle(
+  Cache::UniquePendingHandle pending_handle(
       meta_cache_->Allocate(key, sizeof(CacheEntry), charge_len));
   if (UNLIKELY(pending_handle.get() == nullptr)) return false;
 
@@ -629,7 +628,7 @@ bool DataCache::Partition::InsertIntoCache(const Slice& key, CacheFile* cache_fi
   // Insert the new entry into the cache.
   CacheEntry entry(cache_file, insertion_offset, buffer_len, checksum);
   memcpy(meta_cache_->MutableValue(&pending_handle), &entry, sizeof(CacheEntry));
-  kudu::Cache::UniqueHandle handle(meta_cache_->Insert(std::move(pending_handle), this));
+  Cache::UniqueHandle handle(meta_cache_->Insert(std::move(pending_handle), this));
   pending_handle = nullptr;
   ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES->Increment(charge_len);
   return true;
@@ -645,8 +644,7 @@ bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffe
 
   // Check for existing entry.
   {
-    kudu::Cache::UniqueHandle handle(
-        meta_cache_->Lookup(key, kudu::Cache::EXPECT_IN_CACHE));
+    Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::EXPECT_IN_CACHE));
     if (handle.get() != nullptr) {
       if (HandleExistingEntry(key, handle, buffer, buffer_len)) return false;
     }
diff --git a/be/src/runtime/io/data-cache.h b/be/src/runtime/io/data-cache.h
index fde1e97..1dd3e38 100644
--- a/be/src/runtime/io/data-cache.h
+++ b/be/src/runtime/io/data-cache.h
@@ -25,16 +25,12 @@
 #include <gtest/gtest_prod.h>
 
 #include "common/status.h"
+#include "util/cache/cache.h"
 #include "util/spinlock.h"
 #include "util/thread-pool.h"
-#include "kudu/util/cache.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/slice.h"
 
-namespace kudu {
-class Cache;
-} // kudu
-
 /// This class is an implementation of an IO data cache which is backed by local storage.
 /// It implicitly relies on the OS page cache management to shuffle data between memory
 /// and the storage device. This is useful for caching data read from remote filesystems
@@ -192,7 +188,7 @@ class DataCache {
 
   /// An implementation of a cache partition. Each partition maintains its own set of
   /// cache keys in a LRU cache.
-  class Partition : public kudu::Cache::EvictionCallback {
+  class Partition : public Cache::EvictionCallback {
    public:
     /// Creates a partition at the given directory 'path' with quota 'capacity' in bytes.
     /// 'max_opened_files' is the maximum number of opened files allowed per partition.
@@ -298,7 +294,7 @@ class DataCache {
     ///
     /// A cache entry has type CacheEntry and it contains the metadata of the cached
     /// content. Please see comments at CachedEntry for details.
-    std::unique_ptr<kudu::Cache> meta_cache_;
+    std::unique_ptr<Cache> meta_cache_;
 
     std::unique_ptr<Tracer> tracer_;
 
@@ -325,7 +321,7 @@ class DataCache {
     /// work needs to be done. Returns false otherwise. In which case, the existing entry
     /// will be overwritten.
     bool HandleExistingEntry(const kudu::Slice& key,
-        const kudu::Cache::UniqueHandle& handle, const uint8_t* buffer,
+        const Cache::UniqueHandle& handle, const uint8_t* buffer,
         int64_t buffer_len);
 
     /// Helper function to insert a new entry with key 'key' into the LRU cache.
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index ba4e94f..21127cb 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+add_subdirectory(cache)
+
 set(SQUEASEL_SRC_DIR "${CMAKE_SOURCE_DIR}/be/src/thirdparty/squeasel")
 set(MUSTACHE_SRC_DIR "${CMAKE_SOURCE_DIR}/be/src/thirdparty/mustache")
 set(MPFIT_SRC_DIR "${CMAKE_SOURCE_DIR}/be/src/thirdparty/mpfit")
diff --git a/be/src/util/cache/CMakeLists.txt b/be/src/util/cache/CMakeLists.txt
index c6065e6..cb3595c 100644
--- a/be/src/util/cache/CMakeLists.txt
+++ b/be/src/util/cache/CMakeLists.txt
@@ -23,9 +23,15 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/util/cache")
 
 add_library(UtilCache
   cache.cc
-  nvm_cache.cc
 )
 add_dependencies(UtilCache gen-deps gen_ir_descriptions)
 
-# Currently does not build cache-test or cache-bench, because they depend on Kudu
-# build infrastructure.
+add_library(UtilCacheTests STATIC
+  cache-test.cc
+)
+add_dependencies(UtilCacheTests gen-deps gen_ir_descriptions)
+
+add_executable(cache-bench cache-bench.cc)
+target_link_libraries(cache-bench ${IMPALA_TEST_LINK_LIBS})
+
+ADD_UNIFIED_BE_LSAN_TEST(cache-test "CacheTypes/CacheTest.*:CacheTypes/LRUCacheTest.*:FIFOCacheTest.*")
diff --git a/be/src/util/cache/cache-bench.cc b/be/src/util/cache/cache-bench.cc
index 91026ae..ec26484 100644
--- a/be/src/util/cache/cache-bench.cc
+++ b/be/src/util/cache/cache-bench.cc
@@ -33,12 +33,12 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/human_readable.h"
-#include "kudu/util/cache.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/slice.h"
-#include "kudu/util/test_util.h"
+#include "util/cache/cache.h"
+#include "testutil/gtest-util.h"
 
 DEFINE_int32(num_threads, 16, "The number of threads to access the cache concurrently.");
 DEFINE_int32(run_seconds, 1, "The number of seconds to run the benchmark");
@@ -50,7 +50,7 @@ using std::thread;
 using std::unique_ptr;
 using std::vector;
 
-namespace kudu {
+namespace impala {
 
 // Benchmark a 1GB cache.
 static constexpr int kCacheCapacity = 1024 * 1024 * 1024;
@@ -90,11 +90,10 @@ struct BenchSetup {
   }
 };
 
-class CacheBench : public KuduTest,
+class CacheBench : public testing::Test,
                    public testing::WithParamInterface<BenchSetup>{
  public:
   void SetUp() override {
-    KuduTest::SetUp();
     cache_.reset(NewCache(kCacheCapacity, "test-cache"));
   }
 
@@ -102,15 +101,18 @@ class CacheBench : public KuduTest,
   // Returns a pair of the number of cache hits and lookups.
   pair<int64_t, int64_t> DoQueries(const atomic<bool>* done) {
     const BenchSetup& setup = GetParam();
-    Random r(GetRandomSeed32());
+    kudu::Random r(kudu::GetRandomSeed32());
     int64_t lookups = 0;
     int64_t hits = 0;
+    // Add max_key variable and test to avoid division by zero warning from clang-tidy
+    uint32_t max_key = setup.max_key();
+    if (max_key == 0) return {0, 0};
     while (!*done) {
       uint32_t int_key;
       if (setup.pattern == BenchSetup::Pattern::ZIPFIAN) {
-        int_key = r.Skewed(Bits::Log2Floor(setup.max_key()));
+        int_key = r.Skewed(Bits::Log2Floor(max_key));
       } else {
-        int_key = r.Uniform(setup.max_key());
+        int_key = r.Uniform(max_key);
       }
       char key_buf[sizeof(int_key)];
       memcpy(key_buf, &int_key, sizeof(int_key));
@@ -142,7 +144,7 @@ class CacheBench : public KuduTest,
           total_lookups += hits_lookups.second;
         });
     }
-    SleepFor(MonoDelta::FromSeconds(n_seconds));
+    kudu::SleepFor(kudu::MonoDelta::FromSeconds(n_seconds));
     done = true;
     for (auto& t : threads) {
       t.join();
@@ -184,4 +186,6 @@ TEST_P(CacheBench, RunBench) {
   LOG(INFO) << test_case << ": " << StringPrintf("%.1f", hit_rate * 100.0) << "% hit rate";
 }
 
-} // namespace kudu
+} // namespace impala
+
+IMPALA_TEST_MAIN();
diff --git a/be/src/util/cache/cache-test.cc b/be/src/util/cache/cache-test.cc
index 6ba34b2..265b0b9 100644
--- a/be/src/util/cache/cache-test.cc
+++ b/be/src/util/cache/cache-test.cc
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include "kudu/util/cache.h"
+#include "util/cache/cache.h"
 
 #include <cstring>
 #include <memory>
@@ -18,20 +18,14 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/block_cache_metrics.h"
-#include "kudu/util/cache_metrics.h"
 #include "kudu/util/coding.h"
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
-#include "kudu/util/nvm_cache.h"
 #include "kudu/util/slice.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
 
 DECLARE_bool(cache_force_single_shard);
-DECLARE_string(nvm_cache_path);
 
 DECLARE_double(cache_memtracker_approximation_ratio);
 
@@ -42,17 +36,17 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
-namespace kudu {
+namespace impala {
 
 // Conversions between numeric keys/values and the types expected by Cache.
 static std::string EncodeInt(int k) {
-  faststring result;
-  PutFixed32(&result, k);
+  kudu::faststring result;
+  kudu::PutFixed32(&result, k);
   return result.ToString();
 }
 static int DecodeInt(const Slice& k) {
   CHECK_EQ(4, k.size());
-  return DecodeFixed32(k.data());
+  return kudu::DecodeFixed32(k.data());
 }
 
 // Cache sharding policy affects the composition of the cache. Some test
@@ -62,7 +56,7 @@ enum class ShardingPolicy {
   SingleShard,
 };
 
-class CacheBaseTest : public KuduTest,
+class CacheBaseTest : public ::testing::Test,
                       public Cache::EvictionCallback {
  public:
   explicit CacheBaseTest(size_t cache_size)
@@ -110,11 +104,6 @@ class CacheBaseTest : public KuduTest,
     FLAGS_cache_force_single_shard =
         (sharding_policy == ShardingPolicy::SingleShard);
 
-    if (google::GetCommandLineFlagInfoOrDie("nvm_cache_path").is_default) {
-      FLAGS_nvm_cache_path = GetTestPath("nvm-cache");
-      ASSERT_OK(Env::Default()->CreateDir(FLAGS_nvm_cache_path));
-    }
-
     switch (eviction_policy) {
       case Cache::EvictionPolicy::FIFO:
         if (mem_type != Cache::MemoryType::DRAM) {
@@ -123,7 +112,7 @@ class CacheBaseTest : public KuduTest,
         cache_.reset(NewCache<Cache::EvictionPolicy::FIFO,
                               Cache::MemoryType::DRAM>(cache_size(),
                                                        "cache_test"));
-        MemTracker::FindTracker("cache_test-sharded_fifo_cache", &mem_tracker_);
+        kudu::MemTracker::FindTracker("cache_test-sharded_fifo_cache", &mem_tracker_);
         break;
       case Cache::EvictionPolicy::LRU:
         switch (mem_type) {
@@ -132,22 +121,13 @@ class CacheBaseTest : public KuduTest,
                                   Cache::MemoryType::DRAM>(cache_size(),
                                                            "cache_test"));
             break;
-          case Cache::MemoryType::NVM:
-            if (CanUseNVMCacheForTests()) {
-              cache_.reset(NewCache<Cache::EvictionPolicy::LRU,
-                                    Cache::MemoryType::NVM>(cache_size(),
-                                                            "cache_test"));
-            }
-            break;
           default:
             FAIL() << mem_type << ": unrecognized cache memory type";
-            break;
         }
-        MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_);
+        kudu::MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_);
         break;
       default:
         FAIL() << "unrecognized cache eviction policy";
-        break;
     }
 
     // Since nvm cache does not have memtracker due to the use of
@@ -155,23 +135,13 @@ class CacheBaseTest : public KuduTest,
     if (mem_type == Cache::MemoryType::DRAM) {
       ASSERT_TRUE(mem_tracker_.get());
     }
-
-    // cache_ will be null if we're trying to set up a test for the NVM cache
-    // and were unable to do so.
-    if (cache_) {
-      scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(
-          &metric_registry_, "test");
-      unique_ptr<BlockCacheMetrics> metrics(new BlockCacheMetrics(entity));
-      cache_->SetMetrics(std::move(metrics));
-    }
   }
 
   const size_t cache_size_;
   vector<int> evicted_keys_;
   vector<int> evicted_values_;
-  shared_ptr<MemTracker> mem_tracker_;
+  shared_ptr<kudu::MemTracker> mem_tracker_;
   unique_ptr<Cache> cache_;
-  MetricRegistry metric_registry_;
 };
 
 class CacheTest :
@@ -206,16 +176,9 @@ INSTANTIATE_TEST_CASE_P(
                    ShardingPolicy::MultiShard),
         make_tuple(Cache::MemoryType::DRAM,
                    Cache::EvictionPolicy::LRU,
-                   ShardingPolicy::SingleShard),
-        make_tuple(Cache::MemoryType::NVM,
-                   Cache::EvictionPolicy::LRU,
-                   ShardingPolicy::MultiShard),
-        make_tuple(Cache::MemoryType::NVM,
-                   Cache::EvictionPolicy::LRU,
                    ShardingPolicy::SingleShard)));
 
 TEST_P(CacheTest, TrackMemory) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   if (mem_tracker_) {
     Insert(100, 100, 1);
     ASSERT_EQ(1, mem_tracker_->consumption());
@@ -226,7 +189,6 @@ TEST_P(CacheTest, TrackMemory) {
 }
 
 TEST_P(CacheTest, HitAndMiss) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   ASSERT_EQ(-1, Lookup(100));
 
   Insert(100, 101);
@@ -250,7 +212,6 @@ TEST_P(CacheTest, HitAndMiss) {
 }
 
 TEST_P(CacheTest, Erase) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   Erase(200);
   ASSERT_EQ(0, evicted_keys_.size());
 
@@ -270,7 +231,6 @@ TEST_P(CacheTest, Erase) {
 }
 
 TEST_P(CacheTest, EntriesArePinned) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   Insert(100, 101);
   auto h1 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
   ASSERT_EQ(101, DecodeInt(cache_->Value(h1)));
@@ -299,7 +259,6 @@ TEST_P(CacheTest, EntriesArePinned) {
 // size of items still in the cache, which must be approximately the
 // same as the total capacity.
 TEST_P(CacheTest, HeavyEntries) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   const int kLight = cache_size() / 1000;
   const int kHeavy = cache_size() / 100;
   int added = 0;
@@ -324,7 +283,6 @@ TEST_P(CacheTest, HeavyEntries) {
 }
 
 TEST_P(CacheTest, InvalidateAllEntries) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   constexpr const int kEntriesNum = 1024;
   // This scenarios assumes no evictions are done at the cache capacity.
   ASSERT_LE(kEntriesNum, cache_size());
@@ -348,7 +306,6 @@ TEST_P(CacheTest, InvalidateAllEntries) {
 }
 
 TEST_P(CacheTest, InvalidateNoEntries) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   constexpr const int kEntriesNum = 10;
   // This scenarios assumes no evictions are done at the cache capacity.
   ASSERT_LE(kEntriesNum, cache_size());
@@ -370,7 +327,6 @@ TEST_P(CacheTest, InvalidateNoEntries) {
 }
 
 TEST_P(CacheTest, InvalidateNoEntriesNoAdvanceIterationFunctor) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   constexpr const int kEntriesNum = 256;
   // This scenarios assumes no evictions are done at the cache capacity.
   ASSERT_LE(kEntriesNum, cache_size());
@@ -398,7 +354,6 @@ TEST_P(CacheTest, InvalidateNoEntriesNoAdvanceIterationFunctor) {
 }
 
 TEST_P(CacheTest, InvalidateOddKeyEntries) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   constexpr const int kEntriesNum = 64;
   // This scenarios assumes no evictions are done at the cache capacity.
   ASSERT_LE(kEntriesNum, cache_size());
@@ -489,8 +444,7 @@ TEST_F(FIFOCacheTest, EvictionPolicy) {
 
 class LRUCacheTest :
     public CacheBaseTest,
-    public ::testing::WithParamInterface<tuple<Cache::MemoryType,
-                                               ShardingPolicy>> {
+    public ::testing::WithParamInterface<ShardingPolicy> {
  public:
   LRUCacheTest()
       : CacheBaseTest(16 * 1024 * 1024) {
@@ -498,21 +452,18 @@ class LRUCacheTest :
 
   void SetUp() override {
     const auto& param = GetParam();
-    SetupWithParameters(std::get<0>(param),
+    SetupWithParameters(Cache::MemoryType::DRAM,
                         Cache::EvictionPolicy::LRU,
-                        std::get<1>(param));
+                        param);
   }
 };
 
 INSTANTIATE_TEST_CASE_P(
     CacheTypes, LRUCacheTest,
-    ::testing::Combine(::testing::Values(Cache::MemoryType::DRAM,
-                                         Cache::MemoryType::NVM),
-                       ::testing::Values(ShardingPolicy::MultiShard,
-                                         ShardingPolicy::SingleShard)));
+    ::testing::Values(ShardingPolicy::MultiShard,
+                      ShardingPolicy::SingleShard));
 
 TEST_P(LRUCacheTest, EvictionPolicy) {
-  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
   static constexpr int kNumElems = 1000;
   const int size_per_elem = cache_size() / kNumElems;
 
@@ -532,4 +483,5 @@ TEST_P(LRUCacheTest, EvictionPolicy) {
   ASSERT_EQ(-1, Lookup(200));
 }
 
-}  // namespace kudu
+}  // namespace impala
+
diff --git a/be/src/util/cache/cache.cc b/be/src/util/cache/cache.cc
index 073fa51..75e6ff6 100644
--- a/be/src/util/cache/cache.cc
+++ b/be/src/util/cache/cache.cc
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include "kudu/util/cache.h"
+#include "util/cache/cache.h"
 
 #include <atomic>
 #include <cstdint>
@@ -26,24 +26,16 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/sysinfo.h"
 #include "kudu/util/alignment.h"
-#include "kudu/util/cache_metrics.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/mem_tracker.h"
-#include "kudu/util/metrics.h"
 #include "kudu/util/slice.h"
-#include "kudu/util/test_util_prod.h"
 
 // Useful in tests that require accurate cache capacity accounting.
-DEFINE_bool(cache_force_single_shard, false,
-            "Override all cache implementations to use just one shard");
-TAG_FLAG(cache_force_single_shard, hidden);
+DECLARE_bool(cache_force_single_shard);
 
-DEFINE_double(cache_memtracker_approximation_ratio, 0.01,
-              "The MemTracker associated with a cache can accumulate error up to "
-              "this ratio to improve performance. For tests.");
-TAG_FLAG(cache_memtracker_approximation_ratio, hidden);
+DECLARE_double(cache_memtracker_approximation_ratio);
 
 using std::atomic;
 using std::shared_ptr;
@@ -51,7 +43,7 @@ using std::string;
 using std::unique_ptr;
 using std::vector;
 
-namespace kudu {
+namespace impala {
 
 Cache::~Cache() {
 }
@@ -200,7 +192,6 @@ string ToString(Cache::EvictionPolicy p) {
       return "lru";
     default:
       LOG(FATAL) << "unexpected cache eviction policy: " << static_cast<int>(p);
-      break;
   }
   return "unknown";
 }
@@ -209,7 +200,7 @@ string ToString(Cache::EvictionPolicy p) {
 template<Cache::EvictionPolicy policy>
 class CacheShard {
  public:
-  explicit CacheShard(MemTracker* tracker);
+  explicit CacheShard(kudu::MemTracker* tracker);
   ~CacheShard();
 
   // Separate from constructor so caller can easily make an array of CacheShard
@@ -218,8 +209,6 @@ class CacheShard {
     max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio;
   }
 
-  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
-
   Cache::Handle* Insert(RLHandle* handle, Cache::EvictionCallback* eviction_callback);
   // Like Cache::Lookup, but with an extra "hash" parameter.
   Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
@@ -259,14 +248,11 @@ class CacheShard {
   // Positive delta indicates an increased memory consumption.
   void UpdateMemTracker(int64_t delta);
 
-  // Update the metrics for a lookup operation in the cache.
-  void UpdateMetricsLookup(bool was_hit, bool caching);
-
   // Initialized before use.
   size_t capacity_;
 
   // mutex_ protects the following state.
-  simple_spinlock mutex_;
+  kudu::simple_spinlock mutex_;
   size_t usage_;
 
   // Dummy head of recency list.
@@ -275,21 +261,18 @@ class CacheShard {
 
   HandleTable table_;
 
-  MemTracker* mem_tracker_;
+  kudu::MemTracker* mem_tracker_;
   atomic<int64_t> deferred_consumption_ { 0 };
 
   // Initialized based on capacity_ to ensure an upper bound on the error on the
   // MemTracker consumption.
   int64_t max_deferred_consumption_;
-
-  CacheMetrics* metrics_;
 };
 
 template<Cache::EvictionPolicy policy>
-CacheShard<policy>::CacheShard(MemTracker* tracker)
+CacheShard<policy>::CacheShard(kudu::MemTracker* tracker)
     : usage_(0),
-      mem_tracker_(tracker),
-      metrics_(nullptr) {
+      mem_tracker_(tracker) {
   // Make empty circular linked list.
   rl_.next = &rl_;
   rl_.prev = &rl_;
@@ -322,10 +305,6 @@ void CacheShard<policy>::FreeEntry(RLHandle* e) {
     e->eviction_callback->EvictedEntry(e->key(), e->value());
   }
   UpdateMemTracker(-static_cast<int64_t>(e->charge));
-  if (PREDICT_TRUE(metrics_)) {
-    metrics_->cache_usage->DecrementBy(e->charge);
-    metrics_->evictions->Increment();
-  }
   delete [] e;
 }
 
@@ -342,26 +321,6 @@ void CacheShard<policy>::UpdateMemTracker(int64_t delta) {
 }
 
 template<Cache::EvictionPolicy policy>
-void CacheShard<policy>::UpdateMetricsLookup(bool was_hit, bool caching) {
-  if (PREDICT_TRUE(metrics_)) {
-    metrics_->lookups->Increment();
-    if (was_hit) {
-      if (caching) {
-        metrics_->cache_hits_caching->Increment();
-      } else {
-        metrics_->cache_hits->Increment();
-      }
-    } else {
-      if (caching) {
-        metrics_->cache_misses_caching->Increment();
-      } else {
-        metrics_->cache_misses->Increment();
-      }
-    }
-  }
-}
-
-template<Cache::EvictionPolicy policy>
 void CacheShard<policy>::RL_Remove(RLHandle* e) {
   e->next->prev = e->prev;
   e->prev->next = e->next;
@@ -403,9 +362,6 @@ Cache::Handle* CacheShard<policy>::Lookup(const Slice& key,
     }
   }
 
-  // Do the metrics outside of the lock.
-  UpdateMetricsLookup(e != nullptr, caching);
-
   return reinterpret_cast<Cache::Handle*>(e);
 }
 
@@ -428,10 +384,6 @@ Cache::Handle* CacheShard<policy>::Insert(
   // Two refs for the handle: one from CacheShard, one for the returned handle.
   handle->refs.store(2, std::memory_order_relaxed);
   UpdateMemTracker(handle->charge);
-  if (PREDICT_TRUE(metrics_)) {
-    metrics_->cache_usage->IncrementBy(handle->charge);
-    metrics_->inserts->Increment();
-  }
 
   RLHandle* to_remove_head = nullptr;
   {
@@ -550,7 +502,7 @@ class ShardedCache : public Cache {
     // A cache is often a singleton, so:
     // 1. We reuse its MemTracker if one already exists, and
     // 2. It is directly parented to the root MemTracker.
-    mem_tracker_ = MemTracker::FindOrCreateGlobalTracker(
+    mem_tracker_ = kudu::MemTracker::FindOrCreateGlobalTracker(
         -1, strings::Substitute("$0-sharded_$1_cache", id, ToString(policy)));
 
     int num_shards = 1 << shard_bits_;
@@ -567,22 +519,6 @@ class ShardedCache : public Cache {
     STLDeleteElements(&shards_);
   }
 
-  void SetMetrics(std::unique_ptr<CacheMetrics> metrics) override {
-    // TODO(KUDU-2165): reuse of the Cache singleton across multiple MiniCluster servers
-    // causes TSAN errors. So, we'll ensure that metrics only get attached once, from
-    // whichever server starts first. This has the downside that, in test builds, we won't
-    // get accurate cache metrics, but that's probably better than spurious failures.
-    std::lock_guard<decltype(metrics_lock_)> l(metrics_lock_);
-    if (metrics_) {
-      CHECK(IsGTest()) << "Metrics should only be set once per Cache singleton";
-      return;
-    }
-    metrics_ = std::move(metrics);
-    for (auto* cache : shards_) {
-      cache->SetMetrics(metrics_.get());
-    }
-  }
-
   UniqueHandle Lookup(const Slice& key, CacheBehavior caching) override {
     const uint32_t hash = HashSlice(key);
     return UniqueHandle(
@@ -624,7 +560,7 @@ class ShardedCache : public Cache {
     // TODO(KUDU-1091): account for the footprint of structures used by Cache's
     //                  internal housekeeping (RL handles, etc.) in case of
     //                  non-automatic charge.
-    handle->charge = (charge == kAutomaticCharge) ? kudu_malloc_usable_size(h.get())
+    handle->charge = (charge == kAutomaticCharge) ? kudu::kudu_malloc_usable_size(h.get())
                                                   : charge;
     handle->hash = HashSlice(key);
     memcpy(handle->kv_data, key.data(), key_len);
@@ -667,16 +603,11 @@ class ShardedCache : public Cache {
     return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
   }
 
-  shared_ptr<MemTracker> mem_tracker_;
-  unique_ptr<CacheMetrics> metrics_;
+  shared_ptr<kudu::MemTracker> mem_tracker_;
   vector<CacheShard<policy>*> shards_;
 
   // Number of bits of hash used to determine the shard.
   const int shard_bits_;
-
-  // Protects 'metrics_'. Used only when metrics are set, to ensure
-  // that they are set only once in test environments.
-  simple_spinlock metrics_lock_;
 };
 
 }  // end anonymous namespace
@@ -698,9 +629,6 @@ std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type) {
     case Cache::MemoryType::DRAM:
       os << "DRAM";
       break;
-    case Cache::MemoryType::NVM:
-      os << "NVM";
-      break;
     default:
       os << "unknown (" << static_cast<int>(mem_type) << ")";
       break;
@@ -708,4 +636,4 @@ std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type) {
   return os;
 }
 
-}  // namespace kudu
+}  // namespace impala
diff --git a/be/src/util/cache/cache.h b/be/src/util/cache/cache.h
index ff3a309..a64fe19 100644
--- a/be/src/util/cache/cache.h
+++ b/be/src/util/cache/cache.h
@@ -28,16 +28,15 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/util/slice.h"
 
-namespace kudu {
+using kudu::Slice;
 
-struct CacheMetrics;
+namespace impala {
 
 class Cache {
  public:
   // Type of memory backing the cache's storage.
   enum class MemoryType {
-    DRAM,
-    NVM,
+    DRAM
   };
 
   // Supported eviction policies for the cache. Eviction policy determines what
@@ -65,9 +64,6 @@ class Cache {
   // function that was passed to the constructor.
   virtual ~Cache();
 
-  // Set the cache metrics to update corresponding counters accordingly.
-  virtual void SetMetrics(std::unique_ptr<CacheMetrics> metrics) = 0;
-
   // Opaque handle to an entry stored in the cache.
   struct Handle { };
 
@@ -360,4 +356,4 @@ Cache* NewCache<Cache::EvictionPolicy::LRU,
 // A helper method to output cache memory type into ostream.
 std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type);
 
-} // namespace kudu
+} // namespace impala
diff --git a/be/src/util/cache/nvm_cache.cc b/be/src/util/cache/nvm_cache.cc
deleted file mode 100644
index d7860ac..0000000
--- a/be/src/util/cache/nvm_cache.cc
+++ /dev/null
@@ -1,719 +0,0 @@
-// This file is derived from cache.cc in the LevelDB project:
-//
-//   Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
-//   Use of this source code is governed by a BSD-style license that can be
-//   found in the LICENSE file.
-//
-// ------------------------------------------------------------
-// This file implements a cache based on the MEMKIND library (http://memkind.github.io/memkind/)
-// This library makes it easy to program against persistent memory hardware by exposing an API
-// which parallels malloc/free, but allocates from persistent memory instead of DRAM.
-//
-// We use this API to implement a cache which treats persistent memory or
-// non-volatile memory as if it were a larger cheaper bank of volatile memory. We
-// currently make no use of its persistence properties.
-//
-// Currently, we only store key/value in NVM. All other data structures such as the
-// ShardedLRUCache instances, hash table, etc are in DRAM. The assumption is that
-// the ratio of data stored vs overhead is quite high.
-
-#include "kudu/util/nvm_cache.h"
-
-#include <dlfcn.h>
-
-#include <cstdint>
-#include <cstring>
-#include <iostream>
-#include <memory>
-#include <mutex>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
-#include <glog/logging.h>
-
-#include "kudu/gutil/atomic_refcount.h"
-#include "kudu/gutil/atomicops.h"
-#include "kudu/gutil/bits.h"
-#include "kudu/gutil/dynamic_annotations.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/hash/city.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/sysinfo.h"
-#include "kudu/util/cache.h"
-#include "kudu/util/cache_metrics.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/scoped_cleanup.h"
-#include "kudu/util/slice.h"
-#include "kudu/util/status.h"
-
-#ifndef MEMKIND_PMEM_MIN_SIZE
-#define MEMKIND_PMEM_MIN_SIZE (1024 * 1024 * 16) // Taken from memkind 1.9.0.
-#endif
-
-struct memkind;
-
-// Useful in tests that require accurate cache capacity accounting.
-DECLARE_bool(cache_force_single_shard);
-
-DEFINE_string(nvm_cache_path, "/pmem",
-              "The path at which the NVM cache will try to allocate its memory. "
-              "This can be a tmpfs or ramfs for testing purposes.");
-
-DEFINE_bool(nvm_cache_simulate_allocation_failure, false,
-            "If true, the NVM cache will inject failures in calls to memkind_malloc "
-            "for testing.");
-TAG_FLAG(nvm_cache_simulate_allocation_failure, unsafe);
-
-using std::string;
-using std::unique_ptr;
-using std::vector;
-using strings::Substitute;
-
-namespace kudu {
-
-namespace {
-
-// Taken together, these typedefs and this macro make it easy to call a
-// memkind function:
-//
-//  CALL_MEMKIND(memkind_malloc, vmp_, size);
-typedef int (*memkind_create_pmem)(const char*, size_t, memkind**);
-typedef int (*memkind_destroy_kind)(memkind*);
-typedef void* (*memkind_malloc)(memkind*, size_t);
-typedef size_t (*memkind_malloc_usable_size)(memkind*, void*);
-typedef void (*memkind_free)(memkind*, void*);
-#define CALL_MEMKIND(func_name, ...) ((func_name)g_##func_name)(__VA_ARGS__)
-
-// Function pointers into memkind; set by InitMemkindOps().
-void* g_memkind_create_pmem;
-void* g_memkind_destroy_kind;
-void* g_memkind_malloc;
-void* g_memkind_malloc_usable_size;
-void* g_memkind_free;
-
-// After InitMemkindOps() is called, true if memkind is available and safe
-// to use, false otherwise.
-bool g_memkind_available;
-
-std::once_flag g_memkind_ops_flag;
-
-// Try to dlsym() a particular symbol from 'handle', storing the result in 'ptr'
-// if successful.
-Status TryDlsym(void* handle, const char* sym, void** ptr) {
-  dlerror(); // Need to clear any existing error first.
-  void* ret = dlsym(handle, sym);
-  char* error = dlerror();
-  if (error) {
-    return Status::NotSupported(Substitute("could not dlsym $0", sym), error);
-  }
-  *ptr = ret;
-  return Status::OK();
-}
-
-// Try to dlopen() memkind and set up all the function pointers we need from it.
-//
-// Note: in terms of protecting ourselves against changes in memkind, we'll
-// notice (and fail) if a symbol is missing, but not if it's signature has
-// changed or if there's some subtle behavioral change. A scan of the memkind
-// repo suggests that backwards compatibility is enforced: symbols are only
-// added and behavioral changes are effected via the introduction of new symbols.
-void InitMemkindOps() {
-  g_memkind_available = false;
-
-  // Use RTLD_NOW so that if any of memkind's dependencies aren't satisfied
-  // (e.g. libnuma is too old and is missing symbols), we'll know up front
-  // instead of during cache operations.
-  void* memkind_lib = dlopen("libmemkind.so.0", RTLD_NOW);
-  if (!memkind_lib) {
-    LOG(WARNING) << "could not dlopen: " << dlerror();
-    return;
-  }
-  auto cleanup = MakeScopedCleanup([&]() {
-    dlclose(memkind_lib);
-  });
-
-#define DLSYM_OR_RETURN(func_name, handle) do { \
-    const Status _s = TryDlsym(memkind_lib, func_name, handle); \
-    if (!_s.ok()) { \
-      LOG(WARNING) << _s.ToString(); \
-      return; \
-    } \
-  } while (0)
-
-  DLSYM_OR_RETURN("memkind_create_pmem", &g_memkind_create_pmem);
-  DLSYM_OR_RETURN("memkind_destroy_kind", &g_memkind_destroy_kind);
-  DLSYM_OR_RETURN("memkind_malloc", &g_memkind_malloc);
-  DLSYM_OR_RETURN("memkind_malloc_usable_size", &g_memkind_malloc_usable_size);
-  DLSYM_OR_RETURN("memkind_free", &g_memkind_free);
-#undef DLSYM_OR_RETURN
-
-  g_memkind_available = true;
-
-  // Need to keep the memkind library handle open so our function pointers
-  // remain loaded in memory.
-  cleanup.cancel();
-}
-
-typedef simple_spinlock MutexType;
-
-// LRU cache implementation
-
-// An entry is a variable length heap-allocated structure.  Entries
-// are kept in a circular doubly linked list ordered by access time.
-struct LRUHandle {
-  Cache::EvictionCallback* eviction_callback;
-  LRUHandle* next_hash;
-  LRUHandle* next;
-  LRUHandle* prev;
-  size_t charge;      // TODO(opt): Only allow uint32_t?
-  uint32_t key_length;
-  uint32_t val_length;
-  Atomic32 refs;
-  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
-  uint8_t* kv_data;
-
-  Slice key() const {
-    return Slice(kv_data, key_length);
-  }
-
-  Slice value() const {
-    return Slice(&kv_data[key_length], val_length);
-  }
-
-  uint8_t* val_ptr() {
-    return &kv_data[key_length];
-  }
-};
-
-// We provide our own simple hash table since it removes a whole bunch
-// of porting hacks and is also faster than some of the built-in hash
-// table implementations in some of the compiler/runtime combinations
-// we have tested.  E.g., readrandom speeds up by ~5% over the g++
-// 4.4.3's builtin hashtable.
-class HandleTable {
- public:
-  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
-  ~HandleTable() { delete[] list_; }
-
-  LRUHandle* Lookup(const Slice& key, uint32_t hash) {
-    return *FindPointer(key, hash);
-  }
-
-  LRUHandle* Insert(LRUHandle* h) {
-    LRUHandle** ptr = FindPointer(h->key(), h->hash);
-    LRUHandle* old = *ptr;
-    h->next_hash = (old == nullptr ? nullptr : old->next_hash);
-    *ptr = h;
-    if (old == nullptr) {
-      ++elems_;
-      if (elems_ > length_) {
-        // Since each cache entry is fairly large, we aim for a small
-        // average linked list length (<= 1).
-        Resize();
-      }
-    }
-    return old;
-  }
-
-  LRUHandle* Remove(const Slice& key, uint32_t hash) {
-    LRUHandle** ptr = FindPointer(key, hash);
-    LRUHandle* result = *ptr;
-    if (result != nullptr) {
-      *ptr = result->next_hash;
-      --elems_;
-    }
-    return result;
-  }
-
- private:
-  // The table consists of an array of buckets where each bucket is
-  // a linked list of cache entries that hash into the bucket.
-  uint32_t length_;
-  uint32_t elems_;
-  LRUHandle** list_;
-
-  // Return a pointer to slot that points to a cache entry that
-  // matches key/hash.  If there is no such cache entry, return a
-  // pointer to the trailing slot in the corresponding linked list.
-  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
-    LRUHandle** ptr = &list_[hash & (length_ - 1)];
-    while (*ptr != nullptr &&
-           ((*ptr)->hash != hash || key != (*ptr)->key())) {
-      ptr = &(*ptr)->next_hash;
-    }
-    return ptr;
-  }
-
-  void Resize() {
-    uint32_t new_length = 16;
-    while (new_length < elems_ * 1.5) {
-      new_length *= 2;
-    }
-    LRUHandle** new_list = new LRUHandle*[new_length];
-    memset(new_list, 0, sizeof(new_list[0]) * new_length);
-    uint32_t count = 0;
-    for (uint32_t i = 0; i < length_; i++) {
-      LRUHandle* h = list_[i];
-      while (h != nullptr) {
-        LRUHandle* next = h->next_hash;
-        uint32_t hash = h->hash;
-        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
-        h->next_hash = *ptr;
-        *ptr = h;
-        h = next;
-        count++;
-      }
-    }
-    DCHECK_EQ(elems_, count);
-    delete[] list_;
-    list_ = new_list;
-    length_ = new_length;
-  }
-};
-
-// A single shard of sharded cache.
-class NvmLRUCache {
- public:
-  explicit NvmLRUCache(memkind *vmp);
-  ~NvmLRUCache();
-
-  // Separate from constructor so caller can easily make an array of LRUCache
-  void SetCapacity(size_t capacity) { capacity_ = capacity; }
-
-  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
-
-  Cache::Handle* Insert(LRUHandle* e, Cache::EvictionCallback* eviction_callback);
-
-  // Like Cache::Lookup, but with an extra "hash" parameter.
-  Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
-  void Release(Cache::Handle* handle);
-  void Erase(const Slice& key, uint32_t hash);
-  size_t Invalidate(const Cache::InvalidationControl& ctl);
-  void* Allocate(size_t size);
-
- private:
-  void NvmLRU_Remove(LRUHandle* e);
-  void NvmLRU_Append(LRUHandle* e);
-  // Just reduce the reference count by 1.
-  // Return true if last reference
-  bool Unref(LRUHandle* e);
-  void FreeEntry(LRUHandle* e);
-
-  // Evict the LRU item in the cache, adding it to the linked list
-  // pointed to by 'to_remove_head'.
-  void EvictOldestUnlocked(LRUHandle** to_remove_head);
-
-  // Free all of the entries in the linked list that has to_free_head
-  // as its head.
-  void FreeLRUEntries(LRUHandle* to_free_head);
-
-  // Wrapper around memkind_malloc which injects failures based on a flag.
-  void* MemkindMalloc(size_t size);
-
-  // Initialized before use.
-  size_t capacity_;
-
-  // mutex_ protects the following state.
-  MutexType mutex_;
-  size_t usage_;
-
-  // Dummy head of LRU list.
-  // lru.prev is newest entry, lru.next is oldest entry.
-  LRUHandle lru_;
-
-  HandleTable table_;
-
-  memkind* vmp_;
-
-  CacheMetrics* metrics_;
-};
-
-NvmLRUCache::NvmLRUCache(memkind* vmp)
-  : usage_(0),
-  vmp_(vmp),
-  metrics_(nullptr) {
-  // Make empty circular linked list
-  lru_.next = &lru_;
-  lru_.prev = &lru_;
-}
-
-NvmLRUCache::~NvmLRUCache() {
-  for (LRUHandle* e = lru_.next; e != &lru_; ) {
-    LRUHandle* next = e->next;
-    DCHECK_EQ(e->refs, 1);  // Error if caller has an unreleased handle
-    if (Unref(e)) {
-      FreeEntry(e);
-    }
-    e = next;
-  }
-}
-
-void* NvmLRUCache::MemkindMalloc(size_t size) {
-  if (PREDICT_FALSE(FLAGS_nvm_cache_simulate_allocation_failure)) {
-    return nullptr;
-  }
-  return CALL_MEMKIND(memkind_malloc, vmp_, size);
-}
-
-bool NvmLRUCache::Unref(LRUHandle* e) {
-  DCHECK_GT(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
-  return !base::RefCountDec(&e->refs);
-}
-
-void NvmLRUCache::FreeEntry(LRUHandle* e) {
-  DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
-  if (e->eviction_callback) {
-    e->eviction_callback->EvictedEntry(e->key(), e->value());
-  }
-  if (PREDICT_TRUE(metrics_)) {
-    metrics_->cache_usage->DecrementBy(e->charge);
-    metrics_->evictions->Increment();
-  }
-  CALL_MEMKIND(memkind_free, vmp_, e);
-}
-
-// Allocate NVM memory.
-void* NvmLRUCache::Allocate(size_t size) {
-  return MemkindMalloc(size);
-}
-
-void NvmLRUCache::NvmLRU_Remove(LRUHandle* e) {
-  e->next->prev = e->prev;
-  e->prev->next = e->next;
-  usage_ -= e->charge;
-}
-
-void NvmLRUCache::NvmLRU_Append(LRUHandle* e) {
-  // Make "e" newest entry by inserting just before lru_
-  e->next = &lru_;
-  e->prev = lru_.prev;
-  e->prev->next = e;
-  e->next->prev = e;
-  usage_ += e->charge;
-}
-
-Cache::Handle* NvmLRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) {
- LRUHandle* e;
-  {
-    std::lock_guard<MutexType> l(mutex_);
-    e = table_.Lookup(key, hash);
-    if (e != nullptr) {
-      // If an entry exists, remove the old entry from the cache
-      // and re-add to the end of the linked list.
-      base::RefCountInc(&e->refs);
-      NvmLRU_Remove(e);
-      NvmLRU_Append(e);
-    }
-  }
-
-  // Do the metrics outside of the lock.
-  if (metrics_) {
-    metrics_->lookups->Increment();
-    bool was_hit = (e != nullptr);
-    if (was_hit) {
-      if (caching) {
-        metrics_->cache_hits_caching->Increment();
-      } else {
-        metrics_->cache_hits->Increment();
-      }
-    } else {
-      if (caching) {
-        metrics_->cache_misses_caching->Increment();
-      } else {
-        metrics_->cache_misses->Increment();
-      }
-    }
-  }
-
-  return reinterpret_cast<Cache::Handle*>(e);
-}
-
-void NvmLRUCache::Release(Cache::Handle* handle) {
-  LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
-  bool last_reference = Unref(e);
-  if (last_reference) {
-    FreeEntry(e);
-  }
-}
-
-void NvmLRUCache::EvictOldestUnlocked(LRUHandle** to_remove_head) {
-  LRUHandle* old = lru_.next;
-  NvmLRU_Remove(old);
-  table_.Remove(old->key(), old->hash);
-  if (Unref(old)) {
-    old->next = *to_remove_head;
-    *to_remove_head = old;
-  }
-}
-
-void NvmLRUCache::FreeLRUEntries(LRUHandle* to_free_head) {
-  while (to_free_head != nullptr) {
-    LRUHandle* next = to_free_head->next;
-    FreeEntry(to_free_head);
-    to_free_head = next;
-  }
-}
-
-Cache::Handle* NvmLRUCache::Insert(LRUHandle* e,
-                                   Cache::EvictionCallback* eviction_callback) {
-  DCHECK(e);
-  LRUHandle* to_remove_head = nullptr;
-
-  e->refs = 2;  // One from LRUCache, one for the returned handle
-  e->eviction_callback = eviction_callback;
-  if (PREDICT_TRUE(metrics_)) {
-    metrics_->cache_usage->IncrementBy(e->charge);
-    metrics_->inserts->Increment();
-  }
-
-  {
-    std::lock_guard<MutexType> l(mutex_);
-
-    NvmLRU_Append(e);
-
-    LRUHandle* old = table_.Insert(e);
-    if (old != nullptr) {
-      NvmLRU_Remove(old);
-      if (Unref(old)) {
-        old->next = to_remove_head;
-        to_remove_head = old;
-      }
-    }
-
-    while (usage_ > capacity_ && lru_.next != &lru_) {
-      EvictOldestUnlocked(&to_remove_head);
-    }
-  }
-
-  // we free the entries here outside of mutex for
-  // performance reasons
-  FreeLRUEntries(to_remove_head);
-
-  return reinterpret_cast<Cache::Handle*>(e);
-}
-
-void NvmLRUCache::Erase(const Slice& key, uint32_t hash) {
-  LRUHandle* e;
-  bool last_reference = false;
-  {
-    std::lock_guard<MutexType> l(mutex_);
-    e = table_.Remove(key, hash);
-    if (e != nullptr) {
-      NvmLRU_Remove(e);
-      last_reference = Unref(e);
-    }
-  }
-  // mutex not held here
-  // last_reference will only be true if e != nullptr
-  if (last_reference) {
-    FreeEntry(e);
-  }
-}
-
-size_t NvmLRUCache::Invalidate(const Cache::InvalidationControl& ctl) {
-  size_t invalid_entry_count = 0;
-  size_t valid_entry_count = 0;
-  LRUHandle* to_remove_head = nullptr;
-
-  {
-    std::lock_guard<MutexType> l(mutex_);
-
-    // rl_.next is the oldest entry in the recency list.
-    LRUHandle* h = lru_.next;
-    while (h != nullptr && h != &lru_ &&
-           ctl.iteration_func(valid_entry_count, invalid_entry_count)) {
-      if (ctl.validity_func(h->key(), h->value())) {
-        // Continue iterating over the list.
-        h = h->next;
-        ++valid_entry_count;
-        continue;
-      }
-      // Copy the handle slated for removal.
-      LRUHandle* h_to_remove = h;
-      // Prepare for next iteration of the cycle.
-      h = h->next;
-
-      NvmLRU_Remove(h_to_remove);
-      table_.Remove(h_to_remove->key(), h_to_remove->hash);
-      if (Unref(h_to_remove)) {
-        h_to_remove->next = to_remove_head;
-        to_remove_head = h_to_remove;
-      }
-      ++invalid_entry_count;
-    }
-  }
-
-  FreeLRUEntries(to_remove_head);
-
-  return invalid_entry_count;
-}
-
-// Determine the number of bits of the hash that should be used to determine
-// the cache shard. This, in turn, determines the number of shards.
-int DetermineShardBits() {
-  int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
-      0 : Bits::Log2Ceiling(base::NumCPUs());
-  VLOG(1) << "Will use " << (1 << bits) << " shards for LRU cache.";
-  return bits;
-}
-
-class ShardedLRUCache : public Cache {
- private:
-  unique_ptr<CacheMetrics> metrics_;
-  vector<NvmLRUCache*> shards_;
-
-  // Number of bits of hash used to determine the shard.
-  const int shard_bits_;
-
-  memkind* vmp_;
-
-  static inline uint32_t HashSlice(const Slice& s) {
-    return util_hash::CityHash64(
-      reinterpret_cast<const char *>(s.data()), s.size());
-  }
-
-  uint32_t Shard(uint32_t hash) {
-    // Widen to uint64 before shifting, or else on a single CPU,
-    // we would try to shift a uint32_t by 32 bits, which is undefined.
-    return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
-  }
-
- public:
-  explicit ShardedLRUCache(size_t capacity, const string& /*id*/, memkind* vmp)
-      : shard_bits_(DetermineShardBits()),
-        vmp_(vmp) {
-    int num_shards = 1 << shard_bits_;
-    const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
-    for (int s = 0; s < num_shards; s++) {
-      gscoped_ptr<NvmLRUCache> shard(new NvmLRUCache(vmp_));
-      shard->SetCapacity(per_shard);
-      shards_.push_back(shard.release());
-    }
-  }
-
-  virtual ~ShardedLRUCache() {
-    STLDeleteElements(&shards_);
-    // Per the note at the top of this file, our cache is entirely volatile.
-    // Hence, when the cache is destructed, we delete the underlying
-    // memkind pool.
-    CALL_MEMKIND(memkind_destroy_kind, vmp_);
-  }
-
-  virtual UniqueHandle Insert(UniquePendingHandle handle,
-                              Cache::EvictionCallback* eviction_callback) OVERRIDE {
-    LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle.release()));
-    return UniqueHandle(
-        shards_[Shard(h->hash)]->Insert(h, eviction_callback),
-        Cache::HandleDeleter(this));
-  }
-  virtual UniqueHandle Lookup(const Slice& key, CacheBehavior caching) OVERRIDE {
-    const uint32_t hash = HashSlice(key);
-    return UniqueHandle(
-        shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE),
-        Cache::HandleDeleter(this));
-  }
-  virtual void Release(Handle* handle) OVERRIDE {
-    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
-    shards_[Shard(h->hash)]->Release(handle);
-  }
-  virtual void Erase(const Slice& key) OVERRIDE {
-    const uint32_t hash = HashSlice(key);
-    shards_[Shard(hash)]->Erase(key, hash);
-  }
-  virtual Slice Value(const UniqueHandle& handle) const OVERRIDE {
-    return reinterpret_cast<const LRUHandle*>(handle.get())->value();
-  }
-  virtual uint8_t* MutableValue(UniquePendingHandle* handle) OVERRIDE {
-    return reinterpret_cast<LRUHandle*>(handle->get())->val_ptr();
-  }
-
-  virtual void SetMetrics(unique_ptr<CacheMetrics> metrics) OVERRIDE {
-    metrics_ = std::move(metrics);
-    for (NvmLRUCache* cache : shards_) {
-      cache->SetMetrics(metrics_.get());
-    }
-  }
-  virtual UniquePendingHandle Allocate(Slice key, int val_len, int charge) OVERRIDE {
-    int key_len = key.size();
-    DCHECK_GE(key_len, 0);
-    DCHECK_GE(val_len, 0);
-
-    // Try allocating from each of the shards -- if memkind is tight,
-    // this can cause eviction, so we might have better luck in different
-    // shards.
-    for (NvmLRUCache* cache : shards_) {
-      UniquePendingHandle ph(static_cast<PendingHandle*>(
-          cache->Allocate(sizeof(LRUHandle) + key_len + val_len)),
-          Cache::PendingHandleDeleter(this));
-      if (ph) {
-        LRUHandle* handle = reinterpret_cast<LRUHandle*>(ph.get());
-        uint8_t* buf = reinterpret_cast<uint8_t*>(ph.get());
-        handle->kv_data = &buf[sizeof(LRUHandle)];
-        handle->val_length = val_len;
-        handle->key_length = key_len;
-        handle->charge = (charge == kAutomaticCharge) ?
-            CALL_MEMKIND(memkind_malloc_usable_size, vmp_, buf) : charge;
-        handle->hash = HashSlice(key);
-        memcpy(handle->kv_data, key.data(), key.size());
-        return ph;
-      }
-    }
-    // TODO(unknown): increment a metric here on allocation failure.
-    return UniquePendingHandle(nullptr, Cache::PendingHandleDeleter(this));
-  }
-
-  virtual void Free(PendingHandle* ph) OVERRIDE {
-    CALL_MEMKIND(memkind_free, vmp_, ph);
-  }
-
-  size_t Invalidate(const InvalidationControl& ctl) override {
-    size_t invalidated_count = 0;
-    for (auto& shard: shards_) {
-      invalidated_count += shard->Invalidate(ctl);
-    }
-    return invalidated_count;
-  }
-};
-
-} // end anonymous namespace
-
-template<>
-Cache* NewCache<Cache::EvictionPolicy::LRU,
-                Cache::MemoryType::NVM>(size_t capacity, const std::string& id) {
-  std::call_once(g_memkind_ops_flag, InitMemkindOps);
-
-  // TODO(adar): we should plumb the failure up the call stack, but at the time
-  // of writing the NVM cache is only usable by the block cache, and its use of
-  // the singleton pattern prevents the surfacing of errors.
-  CHECK(g_memkind_available) << "Memkind not available!";
-
-  // memkind_create_pmem() will fail if the capacity is too small, but with
-  // an inscrutable error. So, we'll check ourselves.
-  CHECK_GE(capacity, MEMKIND_PMEM_MIN_SIZE)
-    << "configured capacity " << capacity << " bytes is less than "
-    << "the minimum capacity for an NVM cache: " << MEMKIND_PMEM_MIN_SIZE;
-
-  memkind* vmp;
-  int err = CALL_MEMKIND(memkind_create_pmem, FLAGS_nvm_cache_path.c_str(), capacity, &vmp);
-  // If we cannot create the cache pool we should not retry.
-  PLOG_IF(FATAL, err) << "Could not initialize NVM cache library in path "
-                           << FLAGS_nvm_cache_path.c_str();
-
-  return new ShardedLRUCache(capacity, id, vmp);
-}
-
-bool CanUseNVMCacheForTests() {
-  std::call_once(g_memkind_ops_flag, InitMemkindOps);
-  return g_memkind_available;
-}
-
-} // namespace kudu
diff --git a/be/src/util/cache/nvm_cache.h b/be/src/util/cache/nvm_cache.h
deleted file mode 100644
index d6c2762..0000000
--- a/be/src/util/cache/nvm_cache.h
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-
-#include <cstddef>
-#include <string>
-
-#include "kudu/util/cache.h"
-
-namespace kudu {
-
-// Convenience macro for invoking CanUseNVMCacheForTests.
-#define RETURN_IF_NO_NVM_CACHE(memory_type) do { \
-  if ((memory_type) == Cache::MemoryType::NVM && !CanUseNVMCacheForTests()) { \
-    LOG(WARNING) << "test is skipped; NVM cache cannot be created"; \
-    return; \
-  } \
-} while (0)
-
-// Returns true if an NVM cache can be created on this machine; false otherwise.
-//
-// Only intended for use in unit tests.
-bool CanUseNVMCacheForTests();
-
-// Create a new LRU cache with a fixed size capacity. This implementation
-// of Cache uses the least-recently-used eviction policy and stored in NVM.
-template<>
-Cache* NewCache<Cache::EvictionPolicy::LRU,
-                Cache::MemoryType::NVM>(size_t capacity, const std::string& id);
-
-}  // namespace kudu
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 495fdcb..b992ad1 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -78,7 +78,6 @@ be/src/kudu/security/x509_check_host.h
 be/src/kudu/security/x509_check_host.cc
 be/src/util/cache/cache.h
 be/src/util/cache/cache.cc
-be/src/util/cache/nvm_cache.cc
 be/src/util/cache/cache-test.cc
 
 # http://www.apache.org/legal/src-headers.html: "Short informational text files; for


[impala] 01/02: IMPALA-8690 (prep 1): Copy Kudu cache code to be/src/util/cache

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit eead1873dddd2ba2b2d97aeb80482835e3e1c7eb
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Wed Feb 5 13:32:15 2020 -0800

    IMPALA-8690 (prep 1): Copy Kudu cache code to be/src/util/cache
    
    The data cache uses Kudu's cache code in be/src/kudu/util.
    To make it easy to make Impala specific changes (such as
    introducing new cache eviction algorithms), this copies the
    code into Impala's utilities directory.
    
    To make the review easier, this is a simple copy of the files
    with a minimal CMakeLists.txt and RAT changes. This does not link
    the resulting library into anything.
    
    A subsequent change will modify the code to be usable by
    Impala. To test, I verified that this compiles.
    
    Change-Id: I0e7b8b852b56613803dea125456fa9e8b3736d76
    Reviewed-on: http://gerrit.cloudera.org:8080/15169
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Joe McDonnell <jo...@cloudera.com>
---
 be/src/util/cache/CMakeLists.txt |  31 ++
 be/src/util/cache/cache-bench.cc | 187 ++++++++++
 be/src/util/cache/cache-test.cc  | 535 +++++++++++++++++++++++++++++
 be/src/util/cache/cache.cc       | 711 ++++++++++++++++++++++++++++++++++++++
 be/src/util/cache/cache.h        | 363 ++++++++++++++++++++
 be/src/util/cache/nvm_cache.cc   | 719 +++++++++++++++++++++++++++++++++++++++
 be/src/util/cache/nvm_cache.h    |  45 +++
 bin/rat_exclude_files.txt        |   4 +
 8 files changed, 2595 insertions(+)

diff --git a/be/src/util/cache/CMakeLists.txt b/be/src/util/cache/CMakeLists.txt
new file mode 100644
index 0000000..c6065e6
--- /dev/null
+++ b/be/src/util/cache/CMakeLists.txt
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/util/cache")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/util/cache")
+
+add_library(UtilCache
+  cache.cc
+  nvm_cache.cc
+)
+add_dependencies(UtilCache gen-deps gen_ir_descriptions)
+
+# Currently does not build cache-test or cache-bench, because they depend on Kudu
+# build infrastructure.
diff --git a/be/src/util/cache/cache-bench.cc b/be/src/util/cache/cache-bench.cc
new file mode 100644
index 0000000..91026ae
--- /dev/null
+++ b/be/src/util/cache/cache-bench.cc
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(num_threads, 16, "The number of threads to access the cache concurrently.");
+DEFINE_int32(run_seconds, 1, "The number of seconds to run the benchmark");
+
+using std::atomic;
+using std::pair;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+
+// Benchmark a 1GB cache.
+static constexpr int kCacheCapacity = 1024 * 1024 * 1024;
+// Use 4kb entries.
+static constexpr int kEntrySize = 4 * 1024;
+
+// Test parameterization.
+struct BenchSetup {
+  enum class Pattern {
+    // Zipfian distribution -- a small number of items make up the
+    // vast majority of lookups.
+    ZIPFIAN,
+    // Every item is equally likely to be looked up.
+    UNIFORM
+  };
+  Pattern pattern;
+
+  // The ratio between the size of the dataset and the cache.
+  //
+  // A value smaller than 1 will ensure that the whole dataset fits
+  // in the cache.
+  double dataset_cache_ratio;
+
+  string ToString() const {
+    string ret;
+    switch (pattern) {
+      case Pattern::ZIPFIAN: ret += "ZIPFIAN"; break;
+      case Pattern::UNIFORM: ret += "UNIFORM"; break;
+    }
+    ret += StringPrintf(" ratio=%.2fx n_unique=%d", dataset_cache_ratio, max_key());
+    return ret;
+  }
+
+  // Return the maximum cache key to be generated for a lookup.
+  uint32_t max_key() const {
+    return static_cast<int64_t>(kCacheCapacity * dataset_cache_ratio) / kEntrySize;
+  }
+};
+
+class CacheBench : public KuduTest,
+                   public testing::WithParamInterface<BenchSetup>{
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    cache_.reset(NewCache(kCacheCapacity, "test-cache"));
+  }
+
+  // Run queries against the cache until '*done' becomes true.
+  // Returns a pair of the number of cache hits and lookups.
+  pair<int64_t, int64_t> DoQueries(const atomic<bool>* done) {
+    const BenchSetup& setup = GetParam();
+    Random r(GetRandomSeed32());
+    int64_t lookups = 0;
+    int64_t hits = 0;
+    while (!*done) {
+      uint32_t int_key;
+      if (setup.pattern == BenchSetup::Pattern::ZIPFIAN) {
+        int_key = r.Skewed(Bits::Log2Floor(setup.max_key()));
+      } else {
+        int_key = r.Uniform(setup.max_key());
+      }
+      char key_buf[sizeof(int_key)];
+      memcpy(key_buf, &int_key, sizeof(int_key));
+      Slice key_slice(key_buf, arraysize(key_buf));
+      auto h(cache_->Lookup(key_slice, Cache::EXPECT_IN_CACHE));
+      if (h) {
+        ++hits;
+      } else {
+        auto ph(cache_->Allocate(
+            key_slice, /* val_len=*/kEntrySize, /* charge=*/kEntrySize));
+        cache_->Insert(std::move(ph), nullptr);
+      }
+      ++lookups;
+    }
+    return {hits, lookups};
+  }
+
+  // Starts the given number of threads to concurrently call DoQueries.
+  // Returns the aggregated number of cache hits and lookups.
+  pair<int64_t, int64_t> RunQueryThreads(int n_threads, int n_seconds) {
+    vector<thread> threads(n_threads);
+    atomic<bool> done(false);
+    atomic<int64_t> total_lookups(0);
+    atomic<int64_t> total_hits(0);
+    for (int i = 0; i < n_threads; i++) {
+      threads[i] = thread([&]() {
+          pair<int64_t, int64_t> hits_lookups = DoQueries(&done);
+          total_hits += hits_lookups.first;
+          total_lookups += hits_lookups.second;
+        });
+    }
+    SleepFor(MonoDelta::FromSeconds(n_seconds));
+    done = true;
+    for (auto& t : threads) {
+      t.join();
+    }
+    return {total_hits, total_lookups};
+  }
+
+ protected:
+  unique_ptr<Cache> cache_;
+};
+
+// Test both distributions, and for each, test both the case where the data
+// fits in the cache and where it is a bit larger.
+INSTANTIATE_TEST_CASE_P(Patterns, CacheBench, testing::ValuesIn(std::vector<BenchSetup>{
+      {BenchSetup::Pattern::ZIPFIAN, 1.0},
+      {BenchSetup::Pattern::ZIPFIAN, 3.0},
+      {BenchSetup::Pattern::UNIFORM, 1.0},
+      {BenchSetup::Pattern::UNIFORM, 3.0}
+    }));
+
+TEST_P(CacheBench, RunBench) {
+  const BenchSetup& setup = GetParam();
+
+  // Run a short warmup phase to try to populate the cache. Otherwise even if the
+  // dataset is smaller than the cache capacity, we would count a bunch of misses
+  // during the warm-up phase.
+  LOG(INFO) << "Warming up...";
+  RunQueryThreads(FLAGS_num_threads, 1);
+
+  LOG(INFO) << "Running benchmark...";
+  pair<int64_t, int64_t> hits_lookups = RunQueryThreads(FLAGS_num_threads, FLAGS_run_seconds);
+  int64_t hits = hits_lookups.first;
+  int64_t lookups = hits_lookups.second;
+
+  int64_t l_per_sec = lookups / FLAGS_run_seconds;
+  double hit_rate = static_cast<double>(hits) / lookups;
+  string test_case = setup.ToString();
+  LOG(INFO) << test_case << ": " << HumanReadableNum::ToString(l_per_sec) << " lookups/sec";
+  LOG(INFO) << test_case << ": " << StringPrintf("%.1f", hit_rate * 100.0) << "% hit rate";
+}
+
+} // namespace kudu
diff --git a/be/src/util/cache/cache-test.cc b/be/src/util/cache/cache-test.cc
new file mode 100644
index 0000000..6ba34b2
--- /dev/null
+++ b/be/src/util/cache/cache-test.cc
@@ -0,0 +1,535 @@
+// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "kudu/util/cache.h"
+
+#include <cstring>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/block_cache_metrics.h"
+#include "kudu/util/cache_metrics.h"
+#include "kudu/util/coding.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/nvm_cache.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(cache_force_single_shard);
+DECLARE_string(nvm_cache_path);
+
+DECLARE_double(cache_memtracker_approximation_ratio);
+
+using std::make_tuple;
+using std::tuple;
+using std::shared_ptr;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+// Conversions between numeric keys/values and the types expected by Cache.
+static std::string EncodeInt(int k) {
+  faststring result;
+  PutFixed32(&result, k);
+  return result.ToString();
+}
+static int DecodeInt(const Slice& k) {
+  CHECK_EQ(4, k.size());
+  return DecodeFixed32(k.data());
+}
+
+// Cache sharding policy affects the composition of the cache. Some test
+// scenarios assume cache is single-sharded to keep the logic simpler.
+enum class ShardingPolicy {
+  MultiShard,
+  SingleShard,
+};
+
+class CacheBaseTest : public KuduTest,
+                      public Cache::EvictionCallback {
+ public:
+  explicit CacheBaseTest(size_t cache_size)
+      : cache_size_(cache_size) {
+  }
+
+  size_t cache_size() const {
+    return cache_size_;
+  }
+
+  // Implementation of the EvictionCallback interface.
+  void EvictedEntry(Slice key, Slice val) override {
+    evicted_keys_.push_back(DecodeInt(key));
+    evicted_values_.push_back(DecodeInt(val));
+  }
+
+  int Lookup(int key) {
+    auto handle(cache_->Lookup(EncodeInt(key), Cache::EXPECT_IN_CACHE));
+    return handle ? DecodeInt(cache_->Value(handle)) : -1;
+  }
+
+  void Insert(int key, int value, int charge = 1) {
+    std::string key_str = EncodeInt(key);
+    std::string val_str = EncodeInt(value);
+    auto handle(cache_->Allocate(key_str, val_str.size(), charge));
+    CHECK(handle);
+    memcpy(cache_->MutableValue(&handle), val_str.data(), val_str.size());
+    cache_->Insert(std::move(handle), this);
+  }
+
+  void Erase(int key) {
+    cache_->Erase(EncodeInt(key));
+  }
+
+ protected:
+  void SetupWithParameters(Cache::MemoryType mem_type,
+                           Cache::EvictionPolicy eviction_policy,
+                           ShardingPolicy sharding_policy) {
+    // Disable approximate tracking of cache memory since we make specific
+    // assertions on the MemTracker in this test.
+    FLAGS_cache_memtracker_approximation_ratio = 0;
+
+    // Using single shard makes the logic of scenarios simple for capacity-
+    // and eviction-related behavior.
+    FLAGS_cache_force_single_shard =
+        (sharding_policy == ShardingPolicy::SingleShard);
+
+    if (google::GetCommandLineFlagInfoOrDie("nvm_cache_path").is_default) {
+      FLAGS_nvm_cache_path = GetTestPath("nvm-cache");
+      ASSERT_OK(Env::Default()->CreateDir(FLAGS_nvm_cache_path));
+    }
+
+    switch (eviction_policy) {
+      case Cache::EvictionPolicy::FIFO:
+        if (mem_type != Cache::MemoryType::DRAM) {
+          FAIL() << "FIFO cache can only be of DRAM type";
+        }
+        cache_.reset(NewCache<Cache::EvictionPolicy::FIFO,
+                              Cache::MemoryType::DRAM>(cache_size(),
+                                                       "cache_test"));
+        MemTracker::FindTracker("cache_test-sharded_fifo_cache", &mem_tracker_);
+        break;
+      case Cache::EvictionPolicy::LRU:
+        switch (mem_type) {
+          case Cache::MemoryType::DRAM:
+            cache_.reset(NewCache<Cache::EvictionPolicy::LRU,
+                                  Cache::MemoryType::DRAM>(cache_size(),
+                                                           "cache_test"));
+            break;
+          case Cache::MemoryType::NVM:
+            if (CanUseNVMCacheForTests()) {
+              cache_.reset(NewCache<Cache::EvictionPolicy::LRU,
+                                    Cache::MemoryType::NVM>(cache_size(),
+                                                            "cache_test"));
+            }
+            break;
+          default:
+            FAIL() << mem_type << ": unrecognized cache memory type";
+            break;
+        }
+        MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_);
+        break;
+      default:
+        FAIL() << "unrecognized cache eviction policy";
+        break;
+    }
+
+    // Since nvm cache does not have memtracker due to the use of
+    // tcmalloc for this we only check for it in the DRAM case.
+    if (mem_type == Cache::MemoryType::DRAM) {
+      ASSERT_TRUE(mem_tracker_.get());
+    }
+
+    // cache_ will be null if we're trying to set up a test for the NVM cache
+    // and were unable to do so.
+    if (cache_) {
+      scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(
+          &metric_registry_, "test");
+      unique_ptr<BlockCacheMetrics> metrics(new BlockCacheMetrics(entity));
+      cache_->SetMetrics(std::move(metrics));
+    }
+  }
+
+  const size_t cache_size_;
+  vector<int> evicted_keys_;
+  vector<int> evicted_values_;
+  shared_ptr<MemTracker> mem_tracker_;
+  unique_ptr<Cache> cache_;
+  MetricRegistry metric_registry_;
+};
+
+class CacheTest :
+    public CacheBaseTest,
+    public ::testing::WithParamInterface<tuple<Cache::MemoryType,
+                                               Cache::EvictionPolicy,
+                                               ShardingPolicy>> {
+ public:
+  CacheTest()
+      : CacheBaseTest(16 * 1024 * 1024) {
+  }
+
+  void SetUp() override {
+    const auto& param = GetParam();
+    SetupWithParameters(std::get<0>(param),
+                        std::get<1>(param),
+                        std::get<2>(param));
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(
+    CacheTypes, CacheTest,
+    ::testing::Values(
+        make_tuple(Cache::MemoryType::DRAM,
+                   Cache::EvictionPolicy::FIFO,
+                   ShardingPolicy::MultiShard),
+        make_tuple(Cache::MemoryType::DRAM,
+                   Cache::EvictionPolicy::FIFO,
+                   ShardingPolicy::SingleShard),
+        make_tuple(Cache::MemoryType::DRAM,
+                   Cache::EvictionPolicy::LRU,
+                   ShardingPolicy::MultiShard),
+        make_tuple(Cache::MemoryType::DRAM,
+                   Cache::EvictionPolicy::LRU,
+                   ShardingPolicy::SingleShard),
+        make_tuple(Cache::MemoryType::NVM,
+                   Cache::EvictionPolicy::LRU,
+                   ShardingPolicy::MultiShard),
+        make_tuple(Cache::MemoryType::NVM,
+                   Cache::EvictionPolicy::LRU,
+                   ShardingPolicy::SingleShard)));
+
+TEST_P(CacheTest, TrackMemory) {
+  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
+  if (mem_tracker_) {
+    Insert(100, 100, 1);
+    ASSERT_EQ(1, mem_tracker_->consumption());
+    Erase(100);
+    ASSERT_EQ(0, mem_tracker_->consumption());
+    ASSERT_EQ(1, mem_tracker_->peak_consumption());
+  }
+}
+
+TEST_P(CacheTest, HitAndMiss) {
+  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
+  ASSERT_EQ(-1, Lookup(100));
+
+  Insert(100, 101);
+  ASSERT_EQ(101, Lookup(100));
+  ASSERT_EQ(-1,  Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+
+  Insert(200, 201);
+  ASSERT_EQ(101, Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+
+  Insert(100, 102);
+  ASSERT_EQ(102, Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+}
+
+TEST_P(CacheTest, Erase) {
+  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
+  Erase(200);
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  Insert(100, 101);
+  Insert(200, 201);
+  Erase(100);
+  ASSERT_EQ(-1,  Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+
+  Erase(100);
+  ASSERT_EQ(-1,  Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(1, evicted_keys_.size());
+}
+
+TEST_P(CacheTest, EntriesArePinned) {
+  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
+  Insert(100, 101);
+  auto h1 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+  ASSERT_EQ(101, DecodeInt(cache_->Value(h1)));
+
+  Insert(100, 102);
+  auto h2 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+  ASSERT_EQ(102, DecodeInt(cache_->Value(h2)));
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  h1.reset();
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+
+  Erase(100);
+  ASSERT_EQ(-1, Lookup(100));
+  ASSERT_EQ(1, evicted_keys_.size());
+
+  h2.reset();
+  ASSERT_EQ(2, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[1]);
+  ASSERT_EQ(102, evicted_values_[1]);
+}
+
+// Add a bunch of light and heavy entries and then count the combined
+// size of items still in the cache, which must be approximately the
+// same as the total capacity.
+TEST_P(CacheTest, HeavyEntries) {
+  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
+  const int kLight = cache_size() / 1000;
+  const int kHeavy = cache_size() / 100;
+  int added = 0;
+  int index = 0;
+  while (added < 2 * cache_size()) {
+    const int weight = (index & 1) ? kLight : kHeavy;
+    Insert(index, 1000+index, weight);
+    added += weight;
+    index++;
+  }
+
+  int cached_weight = 0;
+  for (int i = 0; i < index; i++) {
+    const int weight = (i & 1 ? kLight : kHeavy);
+    int r = Lookup(i);
+    if (r >= 0) {
+      cached_weight += weight;
+      ASSERT_EQ(1000+i, r);
+    }
+  }
+  ASSERT_LE(cached_weight, cache_size() + cache_size() / 10);
+}
+
+TEST_P(CacheTest, InvalidateAllEntries) {
+  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
+  constexpr const int kEntriesNum = 1024;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate({}));
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+  // Remove a few entries from the cache (sparse pattern of keys).
+  constexpr const int kSparseKeys[] = {1, 100, 101, 500, 501, 512, 999, 1001};
+  for (const auto key : kSparseKeys) {
+    Erase(key);
+  }
+  ASSERT_EQ(ARRAYSIZE(kSparseKeys), evicted_keys_.size());
+
+  // All inserted entries, except for the removed one, should be invalidated.
+  ASSERT_EQ(kEntriesNum - ARRAYSIZE(kSparseKeys), cache_->Invalidate({}));
+  // In the end, no entries should be left in the cache.
+  ASSERT_EQ(kEntriesNum, evicted_keys_.size());
+}
+
+TEST_P(CacheTest, InvalidateNoEntries) {
+  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
+  constexpr const int kEntriesNum = 10;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  const Cache::ValidityFunc func = [](Slice /* key */, Slice /* value */) {
+    return true;
+  };
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate({ func }));
+
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+
+  // No entries should be invalidated since the validity function considers
+  // all entries valid.
+  ASSERT_EQ(0, cache_->Invalidate({ func }));
+  ASSERT_TRUE(evicted_keys_.empty());
+}
+
+TEST_P(CacheTest, InvalidateNoEntriesNoAdvanceIterationFunctor) {
+  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
+  constexpr const int kEntriesNum = 256;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  const Cache::InvalidationControl ctl = {
+    Cache::kInvalidateAllEntriesFunc,
+    [](size_t /* valid_entries_count */, size_t /* invalid_entries_count */) {
+      // Never advance over the item list.
+      return false;
+    }
+  };
+
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate(ctl));
+
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+
+  // No entries should be invalidated since the iteration functor doesn't
+  // advance over the list of entries, even if every entry is declared invalid.
+  ASSERT_EQ(0, cache_->Invalidate(ctl));
+  // In the end, all entries should be in the cache.
+  ASSERT_EQ(0, evicted_keys_.size());
+}
+
+TEST_P(CacheTest, InvalidateOddKeyEntries) {
+  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
+  constexpr const int kEntriesNum = 64;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  const Cache::ValidityFunc func = [](Slice key, Slice /* value */) {
+    return DecodeInt(key) % 2 == 0;
+  };
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate({ func }));
+
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+  ASSERT_EQ(kEntriesNum / 2, cache_->Invalidate({ func }));
+  ASSERT_EQ(kEntriesNum / 2, evicted_keys_.size());
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    if (i % 2 == 0) {
+      ASSERT_EQ(i,  Lookup(i));
+    } else {
+      ASSERT_EQ(-1,  Lookup(i));
+    }
+  }
+}
+
+// This class is dedicated for scenarios specific for FIFOCache.
+// The scenarios use a single-shard cache for simpler logic.
+class FIFOCacheTest : public CacheBaseTest {
+ public:
+  FIFOCacheTest()
+      : CacheBaseTest(10 * 1024) {
+  }
+
+  void SetUp() override {
+    SetupWithParameters(Cache::MemoryType::DRAM,
+                        Cache::EvictionPolicy::FIFO,
+                        ShardingPolicy::SingleShard);
+  }
+};
+
+// Verify how the eviction behavior of a FIFO cache.
+TEST_F(FIFOCacheTest, EvictionPolicy) {
+  static constexpr int kNumElems = 20;
+  const int size_per_elem = cache_size() / kNumElems;
+  // First data chunk: fill the cache up to the capacity.
+  int idx = 0;
+  do {
+    Insert(idx, idx, size_per_elem);
+    // Keep looking up the very first entry: this is to make sure lookups
+    // do not affect the recency criteria of the eviction policy for FIFO cache.
+    Lookup(0);
+    ++idx;
+  } while (evicted_keys_.empty());
+  ASSERT_GT(idx, 1);
+
+  // Make sure the earliest inserted entry was evicted.
+  ASSERT_EQ(-1, Lookup(0));
+
+  // Verify that the 'empirical' capacity matches the expected capacity
+  // (it's a single-shard cache).
+  const int capacity = idx - 1;
+  ASSERT_EQ(kNumElems, capacity);
+
+  // Second data chunk: add (capacity / 2) more elements.
+  for (int i = 1; i < capacity / 2; ++i) {
+    // Earlier inserted elements should be gone one-by-one as new elements are
+    // inserted, and lookups should not affect the recency criteria of the FIFO
+    // eviction policy.
+    ASSERT_EQ(i, Lookup(i));
+    Insert(capacity + i, capacity + i, size_per_elem);
+    ASSERT_EQ(capacity + i, Lookup(capacity + i));
+    ASSERT_EQ(-1, Lookup(i));
+  }
+  ASSERT_EQ(capacity / 2, evicted_keys_.size());
+
+  // Early inserted elements from the first chunk should be evicted
+  // to accommodate the elements from the second chunk.
+  for (int i = 0; i < capacity / 2; ++i) {
+    SCOPED_TRACE(Substitute("early inserted elements: index $0", i));
+    ASSERT_EQ(-1, Lookup(i));
+  }
+  // The later inserted elements from the first chunk should be still
+  // in the cache.
+  for (int i = capacity / 2; i < capacity; ++i) {
+    SCOPED_TRACE(Substitute("late inserted elements: index $0", i));
+    ASSERT_EQ(i, Lookup(i));
+  }
+}
+
+class LRUCacheTest :
+    public CacheBaseTest,
+    public ::testing::WithParamInterface<tuple<Cache::MemoryType,
+                                               ShardingPolicy>> {
+ public:
+  LRUCacheTest()
+      : CacheBaseTest(16 * 1024 * 1024) {
+  }
+
+  void SetUp() override {
+    const auto& param = GetParam();
+    SetupWithParameters(std::get<0>(param),
+                        Cache::EvictionPolicy::LRU,
+                        std::get<1>(param));
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(
+    CacheTypes, LRUCacheTest,
+    ::testing::Combine(::testing::Values(Cache::MemoryType::DRAM,
+                                         Cache::MemoryType::NVM),
+                       ::testing::Values(ShardingPolicy::MultiShard,
+                                         ShardingPolicy::SingleShard)));
+
+TEST_P(LRUCacheTest, EvictionPolicy) {
+  RETURN_IF_NO_NVM_CACHE(std::get<0>(GetParam()));
+  static constexpr int kNumElems = 1000;
+  const int size_per_elem = cache_size() / kNumElems;
+
+  Insert(100, 101);
+  Insert(200, 201);
+
+  // Loop adding and looking up new entries, but repeatedly accessing key 101.
+  // This frequently-used entry should not be evicted.
+  for (int i = 0; i < kNumElems + 1000; i++) {
+    Insert(1000+i, 2000+i, size_per_elem);
+    ASSERT_EQ(2000+i, Lookup(1000+i));
+    ASSERT_EQ(101, Lookup(100));
+  }
+  ASSERT_EQ(101, Lookup(100));
+  // Since '200' wasn't accessed in the loop above, it should have
+  // been evicted.
+  ASSERT_EQ(-1, Lookup(200));
+}
+
+}  // namespace kudu
diff --git a/be/src/util/cache/cache.cc b/be/src/util/cache/cache.cc
new file mode 100644
index 0000000..073fa51
--- /dev/null
+++ b/be/src/util/cache/cache.cc
@@ -0,0 +1,711 @@
+// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "kudu/util/cache.h"
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/alignment.h"
+#include "kudu/util/cache_metrics.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util_prod.h"
+
+// Useful in tests that require accurate cache capacity accounting.
+DEFINE_bool(cache_force_single_shard, false,
+            "Override all cache implementations to use just one shard");
+TAG_FLAG(cache_force_single_shard, hidden);
+
+DEFINE_double(cache_memtracker_approximation_ratio, 0.01,
+              "The MemTracker associated with a cache can accumulate error up to "
+              "this ratio to improve performance. For tests.");
+TAG_FLAG(cache_memtracker_approximation_ratio, hidden);
+
+using std::atomic;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+
+Cache::~Cache() {
+}
+
+const Cache::ValidityFunc Cache::kInvalidateAllEntriesFunc = [](
+    Slice /* key */, Slice /* value */) {
+  return false;
+};
+
+const Cache::IterationFunc Cache::kIterateOverAllEntriesFunc = [](
+    size_t /* valid_entries_num */, size_t /* invalid_entries_num */) {
+  return true;
+};
+
+namespace {
+
+// Recency list cache implementations (FIFO, LRU, etc.)
+
+// Recency list handle. An entry is a variable length heap-allocated structure.
+// Entries are kept in a circular doubly linked list ordered by some recency
+// criterion (e.g., access time for LRU policy, insertion time for FIFO policy).
+struct RLHandle {
+  Cache::EvictionCallback* eviction_callback;
+  RLHandle* next_hash;
+  RLHandle* next;
+  RLHandle* prev;
+  size_t charge;      // TODO(opt): Only allow uint32_t?
+  uint32_t key_length;
+  uint32_t val_length;
+  std::atomic<int32_t> refs;
+  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
+
+  // The storage for the key/value pair itself. The data is stored as:
+  //   [key bytes ...] [padding up to 8-byte boundary] [value bytes ...]
+  uint8_t kv_data[1];   // Beginning of key/value pair
+
+  Slice key() const {
+    return Slice(kv_data, key_length);
+  }
+
+  uint8_t* mutable_val_ptr() {
+    int val_offset = KUDU_ALIGN_UP(key_length, sizeof(void*));
+    return &kv_data[val_offset];
+  }
+
+  const uint8_t* val_ptr() const {
+    return const_cast<RLHandle*>(this)->mutable_val_ptr();
+  }
+
+  Slice value() const {
+    return Slice(val_ptr(), val_length);
+  }
+};
+
+// We provide our own simple hash table since it removes a whole bunch
+// of porting hacks and is also faster than some of the built-in hash
+// table implementations in some of the compiler/runtime combinations
+// we have tested.  E.g., readrandom speeds up by ~5% over the g++
+// 4.4.3's builtin hashtable.
+class HandleTable {
+ public:
+  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
+  ~HandleTable() { delete[] list_; }
+
+  RLHandle* Lookup(const Slice& key, uint32_t hash) {
+    return *FindPointer(key, hash);
+  }
+
+  RLHandle* Insert(RLHandle* h) {
+    RLHandle** ptr = FindPointer(h->key(), h->hash);
+    RLHandle* old = *ptr;
+    h->next_hash = (old == nullptr ? nullptr : old->next_hash);
+    *ptr = h;
+    if (old == nullptr) {
+      ++elems_;
+      if (elems_ > length_) {
+        // Since each cache entry is fairly large, we aim for a small
+        // average linked list length (<= 1).
+        Resize();
+      }
+    }
+    return old;
+  }
+
+  RLHandle* Remove(const Slice& key, uint32_t hash) {
+    RLHandle** ptr = FindPointer(key, hash);
+    RLHandle* result = *ptr;
+    if (result != nullptr) {
+      *ptr = result->next_hash;
+      --elems_;
+    }
+    return result;
+  }
+
+ private:
+  // The table consists of an array of buckets where each bucket is
+  // a linked list of cache entries that hash into the bucket.
+  uint32_t length_;
+  uint32_t elems_;
+  RLHandle** list_;
+
+  // Return a pointer to slot that points to a cache entry that
+  // matches key/hash.  If there is no such cache entry, return a
+  // pointer to the trailing slot in the corresponding linked list.
+  RLHandle** FindPointer(const Slice& key, uint32_t hash) {
+    RLHandle** ptr = &list_[hash & (length_ - 1)];
+    while (*ptr != nullptr &&
+           ((*ptr)->hash != hash || key != (*ptr)->key())) {
+      ptr = &(*ptr)->next_hash;
+    }
+    return ptr;
+  }
+
+  void Resize() {
+    uint32_t new_length = 16;
+    while (new_length < elems_ * 1.5) {
+      new_length *= 2;
+    }
+    auto new_list = new RLHandle*[new_length];
+    memset(new_list, 0, sizeof(new_list[0]) * new_length);
+    uint32_t count = 0;
+    for (uint32_t i = 0; i < length_; i++) {
+      RLHandle* h = list_[i];
+      while (h != nullptr) {
+        RLHandle* next = h->next_hash;
+        uint32_t hash = h->hash;
+        RLHandle** ptr = &new_list[hash & (new_length - 1)];
+        h->next_hash = *ptr;
+        *ptr = h;
+        h = next;
+        count++;
+      }
+    }
+    DCHECK_EQ(elems_, count);
+    delete[] list_;
+    list_ = new_list;
+    length_ = new_length;
+  }
+};
+
+string ToString(Cache::EvictionPolicy p) {
+  switch (p) {
+    case Cache::EvictionPolicy::FIFO:
+      return "fifo";
+    case Cache::EvictionPolicy::LRU:
+      return "lru";
+    default:
+      LOG(FATAL) << "unexpected cache eviction policy: " << static_cast<int>(p);
+      break;
+  }
+  return "unknown";
+}
+
+// A single shard of sharded cache.
+template<Cache::EvictionPolicy policy>
+class CacheShard {
+ public:
+  explicit CacheShard(MemTracker* tracker);
+  ~CacheShard();
+
+  // Separate from constructor so caller can easily make an array of CacheShard
+  void SetCapacity(size_t capacity) {
+    capacity_ = capacity;
+    max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio;
+  }
+
+  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
+
+  Cache::Handle* Insert(RLHandle* handle, Cache::EvictionCallback* eviction_callback);
+  // Like Cache::Lookup, but with an extra "hash" parameter.
+  Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
+  void Release(Cache::Handle* handle);
+  void Erase(const Slice& key, uint32_t hash);
+  size_t Invalidate(const Cache::InvalidationControl& ctl);
+
+ private:
+  void RL_Remove(RLHandle* e);
+  void RL_Append(RLHandle* e);
+  // Update the recency list after a lookup operation.
+  void RL_UpdateAfterLookup(RLHandle* e);
+  // Just reduce the reference count by 1.
+  // Return true if last reference
+  bool Unref(RLHandle* e);
+  // Call the user's eviction callback, if it exists, and free the entry.
+  void FreeEntry(RLHandle* e);
+
+
+  // Update the memtracker's consumption by the given amount.
+  //
+  // This "buffers" the updates locally in 'deferred_consumption_' until the amount
+  // of accumulated delta is more than ~1% of the cache capacity. This improves
+  // performance under workloads with high eviction rates for a few reasons:
+  //
+  // 1) once the cache reaches its full capacity, we expect it to remain there
+  // in steady state. Each insertion is usually matched by an eviction, and unless
+  // the total size of the evicted item(s) is much different than the size of the
+  // inserted item, each eviction event is unlikely to change the total cache usage
+  // much. So, we expect that the accumulated error will mostly remain around 0
+  // and we can avoid propagating changes to the MemTracker at all.
+  //
+  // 2) because the cache implementation is sharded, we do this tracking in a bunch
+  // of different locations, avoiding bouncing cache-lines between cores. By contrast
+  // the MemTracker is a simple integer, so it doesn't scale as well under concurrency.
+  //
+  // Positive delta indicates an increased memory consumption.
+  void UpdateMemTracker(int64_t delta);
+
+  // Update the metrics for a lookup operation in the cache.
+  void UpdateMetricsLookup(bool was_hit, bool caching);
+
+  // Initialized before use.
+  size_t capacity_;
+
+  // mutex_ protects the following state.
+  simple_spinlock mutex_;
+  size_t usage_;
+
+  // Dummy head of recency list.
+  // rl.prev is newest entry, rl.next is oldest entry.
+  RLHandle rl_;
+
+  HandleTable table_;
+
+  MemTracker* mem_tracker_;
+  atomic<int64_t> deferred_consumption_ { 0 };
+
+  // Initialized based on capacity_ to ensure an upper bound on the error on the
+  // MemTracker consumption.
+  int64_t max_deferred_consumption_;
+
+  CacheMetrics* metrics_;
+};
+
+template<Cache::EvictionPolicy policy>
+CacheShard<policy>::CacheShard(MemTracker* tracker)
+    : usage_(0),
+      mem_tracker_(tracker),
+      metrics_(nullptr) {
+  // Make empty circular linked list.
+  rl_.next = &rl_;
+  rl_.prev = &rl_;
+}
+
+template<Cache::EvictionPolicy policy>
+CacheShard<policy>::~CacheShard() {
+  for (RLHandle* e = rl_.next; e != &rl_; ) {
+    RLHandle* next = e->next;
+    DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1)
+        << "caller has an unreleased handle";
+    if (Unref(e)) {
+      FreeEntry(e);
+    }
+    e = next;
+  }
+  mem_tracker_->Consume(deferred_consumption_);
+}
+
+template<Cache::EvictionPolicy policy>
+bool CacheShard<policy>::Unref(RLHandle* e) {
+  DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0);
+  return e->refs.fetch_sub(1) == 1;
+}
+
+template<Cache::EvictionPolicy policy>
+void CacheShard<policy>::FreeEntry(RLHandle* e) {
+  DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0);
+  if (e->eviction_callback) {
+    e->eviction_callback->EvictedEntry(e->key(), e->value());
+  }
+  UpdateMemTracker(-static_cast<int64_t>(e->charge));
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->DecrementBy(e->charge);
+    metrics_->evictions->Increment();
+  }
+  delete [] e;
+}
+
+template<Cache::EvictionPolicy policy>
+void CacheShard<policy>::UpdateMemTracker(int64_t delta) {
+  int64_t old_deferred = deferred_consumption_.fetch_add(delta);
+  int64_t new_deferred = old_deferred + delta;
+
+  if (new_deferred > max_deferred_consumption_ ||
+      new_deferred < -max_deferred_consumption_) {
+    int64_t to_propagate = deferred_consumption_.exchange(0, std::memory_order_relaxed);
+    mem_tracker_->Consume(to_propagate);
+  }
+}
+
+template<Cache::EvictionPolicy policy>
+void CacheShard<policy>::UpdateMetricsLookup(bool was_hit, bool caching) {
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->lookups->Increment();
+    if (was_hit) {
+      if (caching) {
+        metrics_->cache_hits_caching->Increment();
+      } else {
+        metrics_->cache_hits->Increment();
+      }
+    } else {
+      if (caching) {
+        metrics_->cache_misses_caching->Increment();
+      } else {
+        metrics_->cache_misses->Increment();
+      }
+    }
+  }
+}
+
+template<Cache::EvictionPolicy policy>
+void CacheShard<policy>::RL_Remove(RLHandle* e) {
+  e->next->prev = e->prev;
+  e->prev->next = e->next;
+  DCHECK_GE(usage_, e->charge);
+  usage_ -= e->charge;
+}
+
+template<Cache::EvictionPolicy policy>
+void CacheShard<policy>::RL_Append(RLHandle* e) {
+  // Make "e" newest entry by inserting just before rl_.
+  e->next = &rl_;
+  e->prev = rl_.prev;
+  e->prev->next = e;
+  e->next->prev = e;
+  usage_ += e->charge;
+}
+
+template<>
+void CacheShard<Cache::EvictionPolicy::FIFO>::RL_UpdateAfterLookup(RLHandle* /* e */) {
+}
+
+template<>
+void CacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e) {
+  RL_Remove(e);
+  RL_Append(e);
+}
+
+template<Cache::EvictionPolicy policy>
+Cache::Handle* CacheShard<policy>::Lookup(const Slice& key,
+                                          uint32_t hash,
+                                          bool caching) {
+  RLHandle* e;
+  {
+    std::lock_guard<decltype(mutex_)> l(mutex_);
+    e = table_.Lookup(key, hash);
+    if (e != nullptr) {
+      e->refs.fetch_add(1, std::memory_order_relaxed);
+      RL_UpdateAfterLookup(e);
+    }
+  }
+
+  // Do the metrics outside of the lock.
+  UpdateMetricsLookup(e != nullptr, caching);
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+template<Cache::EvictionPolicy policy>
+void CacheShard<policy>::Release(Cache::Handle* handle) {
+  RLHandle* e = reinterpret_cast<RLHandle*>(handle);
+  bool last_reference = Unref(e);
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+
+template<Cache::EvictionPolicy policy>
+Cache::Handle* CacheShard<policy>::Insert(
+    RLHandle* handle,
+    Cache::EvictionCallback* eviction_callback) {
+  // Set the remaining RLHandle members which were not already allocated during
+  // Allocate().
+  handle->eviction_callback = eviction_callback;
+  // Two refs for the handle: one from CacheShard, one for the returned handle.
+  handle->refs.store(2, std::memory_order_relaxed);
+  UpdateMemTracker(handle->charge);
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->IncrementBy(handle->charge);
+    metrics_->inserts->Increment();
+  }
+
+  RLHandle* to_remove_head = nullptr;
+  {
+    std::lock_guard<decltype(mutex_)> l(mutex_);
+
+    RL_Append(handle);
+
+    RLHandle* old = table_.Insert(handle);
+    if (old != nullptr) {
+      RL_Remove(old);
+      if (Unref(old)) {
+        old->next = to_remove_head;
+        to_remove_head = old;
+      }
+    }
+
+    while (usage_ > capacity_ && rl_.next != &rl_) {
+      RLHandle* old = rl_.next;
+      RL_Remove(old);
+      table_.Remove(old->key(), old->hash);
+      if (Unref(old)) {
+        old->next = to_remove_head;
+        to_remove_head = old;
+      }
+    }
+  }
+
+  // we free the entries here outside of mutex for
+  // performance reasons
+  while (to_remove_head != nullptr) {
+    RLHandle* next = to_remove_head->next;
+    FreeEntry(to_remove_head);
+    to_remove_head = next;
+  }
+
+  return reinterpret_cast<Cache::Handle*>(handle);
+}
+
+template<Cache::EvictionPolicy policy>
+void CacheShard<policy>::Erase(const Slice& key, uint32_t hash) {
+  RLHandle* e;
+  bool last_reference = false;
+  {
+    std::lock_guard<decltype(mutex_)> l(mutex_);
+    e = table_.Remove(key, hash);
+    if (e != nullptr) {
+      RL_Remove(e);
+      last_reference = Unref(e);
+    }
+  }
+  // mutex not held here
+  // last_reference will only be true if e != NULL
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+
+template<Cache::EvictionPolicy policy>
+size_t CacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
+  size_t invalid_entry_count = 0;
+  size_t valid_entry_count = 0;
+  RLHandle* to_remove_head = nullptr;
+
+  {
+    std::lock_guard<decltype(mutex_)> l(mutex_);
+
+    // rl_.next is the oldest (a.k.a. least relevant) entry in the recency list.
+    RLHandle* h = rl_.next;
+    while (h != nullptr && h != &rl_ &&
+           ctl.iteration_func(valid_entry_count, invalid_entry_count)) {
+      if (ctl.validity_func(h->key(), h->value())) {
+        // Continue iterating over the list.
+        h = h->next;
+        ++valid_entry_count;
+        continue;
+      }
+      // Copy the handle slated for removal.
+      RLHandle* h_to_remove = h;
+      // Prepare for next iteration of the cycle.
+      h = h->next;
+
+      RL_Remove(h_to_remove);
+      table_.Remove(h_to_remove->key(), h_to_remove->hash);
+      if (Unref(h_to_remove)) {
+        h_to_remove->next = to_remove_head;
+        to_remove_head = h_to_remove;
+      }
+      ++invalid_entry_count;
+    }
+  }
+  // Once removed from the lookup table and the recency list, the entries
+  // with no references left must be deallocated because Cache::Release()
+  // wont be called for them from elsewhere.
+  while (to_remove_head != nullptr) {
+    RLHandle* next = to_remove_head->next;
+    FreeEntry(to_remove_head);
+    to_remove_head = next;
+  }
+  return invalid_entry_count;
+}
+
+// Determine the number of bits of the hash that should be used to determine
+// the cache shard. This, in turn, determines the number of shards.
+int DetermineShardBits() {
+  int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
+      0 : Bits::Log2Ceiling(base::NumCPUs());
+  VLOG(1) << "Will use " << (1 << bits) << " shards for recency list cache.";
+  return bits;
+}
+
+template<Cache::EvictionPolicy policy>
+class ShardedCache : public Cache {
+ public:
+  explicit ShardedCache(size_t capacity, const string& id)
+        : shard_bits_(DetermineShardBits()) {
+    // A cache is often a singleton, so:
+    // 1. We reuse its MemTracker if one already exists, and
+    // 2. It is directly parented to the root MemTracker.
+    mem_tracker_ = MemTracker::FindOrCreateGlobalTracker(
+        -1, strings::Substitute("$0-sharded_$1_cache", id, ToString(policy)));
+
+    int num_shards = 1 << shard_bits_;
+    const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
+    for (int s = 0; s < num_shards; s++) {
+      unique_ptr<CacheShard<policy>> shard(
+          new CacheShard<policy>(mem_tracker_.get()));
+      shard->SetCapacity(per_shard);
+      shards_.push_back(shard.release());
+    }
+  }
+
+  virtual ~ShardedCache() {
+    STLDeleteElements(&shards_);
+  }
+
+  void SetMetrics(std::unique_ptr<CacheMetrics> metrics) override {
+    // TODO(KUDU-2165): reuse of the Cache singleton across multiple MiniCluster servers
+    // causes TSAN errors. So, we'll ensure that metrics only get attached once, from
+    // whichever server starts first. This has the downside that, in test builds, we won't
+    // get accurate cache metrics, but that's probably better than spurious failures.
+    std::lock_guard<decltype(metrics_lock_)> l(metrics_lock_);
+    if (metrics_) {
+      CHECK(IsGTest()) << "Metrics should only be set once per Cache singleton";
+      return;
+    }
+    metrics_ = std::move(metrics);
+    for (auto* cache : shards_) {
+      cache->SetMetrics(metrics_.get());
+    }
+  }
+
+  UniqueHandle Lookup(const Slice& key, CacheBehavior caching) override {
+    const uint32_t hash = HashSlice(key);
+    return UniqueHandle(
+        shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE),
+        Cache::HandleDeleter(this));
+  }
+
+  void Erase(const Slice& key) override {
+    const uint32_t hash = HashSlice(key);
+    shards_[Shard(hash)]->Erase(key, hash);
+  }
+
+  Slice Value(const UniqueHandle& handle) const override {
+    return reinterpret_cast<const RLHandle*>(handle.get())->value();
+  }
+
+  UniqueHandle Insert(UniquePendingHandle handle,
+                      Cache::EvictionCallback* eviction_callback) override {
+    RLHandle* h = reinterpret_cast<RLHandle*>(DCHECK_NOTNULL(handle.release()));
+    return UniqueHandle(
+        shards_[Shard(h->hash)]->Insert(h, eviction_callback),
+        Cache::HandleDeleter(this));
+  }
+
+  UniquePendingHandle Allocate(Slice key, int val_len, int charge) override {
+    int key_len = key.size();
+    DCHECK_GE(key_len, 0);
+    DCHECK_GE(val_len, 0);
+    int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
+    UniquePendingHandle h(reinterpret_cast<PendingHandle*>(
+        new uint8_t[sizeof(RLHandle)
+                    + key_len_padded + val_len // the kv_data VLA data
+                    - 1 // (the VLA has a 1-byte placeholder)
+                   ]),
+        PendingHandleDeleter(this));
+    RLHandle* handle = reinterpret_cast<RLHandle*>(h.get());
+    handle->key_length = key_len;
+    handle->val_length = val_len;
+    // TODO(KUDU-1091): account for the footprint of structures used by Cache's
+    //                  internal housekeeping (RL handles, etc.) in case of
+    //                  non-automatic charge.
+    handle->charge = (charge == kAutomaticCharge) ? kudu_malloc_usable_size(h.get())
+                                                  : charge;
+    handle->hash = HashSlice(key);
+    memcpy(handle->kv_data, key.data(), key_len);
+
+    return h;
+  }
+
+  uint8_t* MutableValue(UniquePendingHandle* handle) override {
+    return reinterpret_cast<RLHandle*>(handle->get())->mutable_val_ptr();
+  }
+
+  size_t Invalidate(const InvalidationControl& ctl) override {
+    size_t invalidated_count = 0;
+    for (auto& shard: shards_) {
+      invalidated_count += shard->Invalidate(ctl);
+    }
+    return invalidated_count;
+  }
+
+ protected:
+  void Release(Handle* handle) override {
+    RLHandle* h = reinterpret_cast<RLHandle*>(handle);
+    shards_[Shard(h->hash)]->Release(handle);
+  }
+
+  void Free(PendingHandle* h) override {
+    uint8_t* data = reinterpret_cast<uint8_t*>(h);
+    delete [] data;
+  }
+
+ private:
+  static inline uint32_t HashSlice(const Slice& s) {
+    return util_hash::CityHash64(
+      reinterpret_cast<const char *>(s.data()), s.size());
+  }
+
+  uint32_t Shard(uint32_t hash) {
+    // Widen to uint64 before shifting, or else on a single CPU,
+    // we would try to shift a uint32_t by 32 bits, which is undefined.
+    return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
+  }
+
+  shared_ptr<MemTracker> mem_tracker_;
+  unique_ptr<CacheMetrics> metrics_;
+  vector<CacheShard<policy>*> shards_;
+
+  // Number of bits of hash used to determine the shard.
+  const int shard_bits_;
+
+  // Protects 'metrics_'. Used only when metrics are set, to ensure
+  // that they are set only once in test environments.
+  simple_spinlock metrics_lock_;
+};
+
+}  // end anonymous namespace
+
+template<>
+Cache* NewCache<Cache::EvictionPolicy::FIFO,
+                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id) {
+  return new ShardedCache<Cache::EvictionPolicy::FIFO>(capacity, id);
+}
+
+template<>
+Cache* NewCache<Cache::EvictionPolicy::LRU,
+                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id) {
+  return new ShardedCache<Cache::EvictionPolicy::LRU>(capacity, id);
+}
+
+std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type) {
+  switch (mem_type) {
+    case Cache::MemoryType::DRAM:
+      os << "DRAM";
+      break;
+    case Cache::MemoryType::NVM:
+      os << "NVM";
+      break;
+    default:
+      os << "unknown (" << static_cast<int>(mem_type) << ")";
+      break;
+  }
+  return os;
+}
+
+}  // namespace kudu
diff --git a/be/src/util/cache/cache.h b/be/src/util/cache/cache.h
new file mode 100644
index 0000000..ff3a309
--- /dev/null
+++ b/be/src/util/cache/cache.h
@@ -0,0 +1,363 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// A Cache is an interface that maps keys to values.  It has internal
+// synchronization and may be safely accessed concurrently from
+// multiple threads.  It may automatically evict entries to make room
+// for new entries.  Values have a specified charge against the cache
+// capacity.  For example, a cache where the values are variable
+// length strings, may use the length of the string as the charge for
+// the string.
+//
+// This is taken from LevelDB and evolved to fit the kudu codebase.
+//
+// TODO(unknown): this is pretty lock-heavy. Would be good to sub out something
+// a little more concurrent.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <functional>
+#include <iosfwd>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+struct CacheMetrics;
+
+class Cache {
+ public:
+  // Type of memory backing the cache's storage.
+  enum class MemoryType {
+    DRAM,
+    NVM,
+  };
+
+  // Supported eviction policies for the cache. Eviction policy determines what
+  // items to evict if the cache is at capacity when trying to accommodate
+  // an extra item.
+  enum class EvictionPolicy {
+    // The earliest added items are evicted (a.k.a. queue).
+    FIFO,
+
+    // The least-recently-used items are evicted.
+    LRU,
+  };
+
+  // Callback interface which is called when an entry is evicted from the
+  // cache.
+  class EvictionCallback {
+   public:
+    virtual void EvictedEntry(Slice key, Slice value) = 0;
+    virtual ~EvictionCallback() = default;
+  };
+
+  Cache() = default;
+
+  // Destroys all existing entries by calling the "deleter"
+  // function that was passed to the constructor.
+  virtual ~Cache();
+
+  // Set the cache metrics to update corresponding counters accordingly.
+  virtual void SetMetrics(std::unique_ptr<CacheMetrics> metrics) = 0;
+
+  // Opaque handle to an entry stored in the cache.
+  struct Handle { };
+
+  // Custom deleter: intended for use with std::unique_ptr<Handle>.
+  class HandleDeleter {
+   public:
+    explicit HandleDeleter(Cache* c)
+        : c_(c) {
+    }
+
+    void operator()(Cache::Handle* h) const {
+      if (h != nullptr) {
+        c_->Release(h);
+      }
+    }
+
+    Cache* cache() const {
+      return c_;
+    }
+
+   private:
+    Cache* c_;
+  };
+
+  // UniqueHandle -- a wrapper around opaque Handle structure to facilitate
+  // automatic reference counting of cache's handles.
+  typedef std::unique_ptr<Handle, HandleDeleter> UniqueHandle;
+
+  // Opaque handle to an entry which is being prepared to be added to the cache.
+  struct PendingHandle { };
+
+  // Custom deleter: intended for use with std::unique_ptr<PendingHandle>.
+  class PendingHandleDeleter {
+   public:
+    explicit PendingHandleDeleter(Cache* c)
+        : c_(c) {
+    }
+
+    void operator()(Cache::PendingHandle* h) const {
+      if (h != nullptr) {
+        c_->Free(h);
+      }
+    }
+
+    Cache* cache() const {
+      return c_;
+    }
+
+   private:
+    Cache* c_;
+  };
+
+  // UniquePendingHandle -- a wrapper around opaque PendingHandle structure
+  // to facilitate automatic reference counting newly allocated cache's handles.
+  typedef std::unique_ptr<PendingHandle, PendingHandleDeleter> UniquePendingHandle;
+
+  // Passing EXPECT_IN_CACHE will increment the hit/miss metrics that track the number of times
+  // blocks were requested that the users were hoping to get the block from the cache, along with
+  // with the basic metrics.
+  // Passing NO_EXPECT_IN_CACHE will only increment the basic metrics.
+  // This helps in determining if we are effectively caching the blocks that matter the most.
+  enum CacheBehavior {
+    EXPECT_IN_CACHE,
+    NO_EXPECT_IN_CACHE
+  };
+
+  // If the cache has no mapping for "key", returns NULL.
+  //
+  // Else return a handle that corresponds to the mapping.
+  //
+  // Sample usage:
+  //
+  //   unique_ptr<Cache> cache(NewCache(...));
+  //   ...
+  //   {
+  //     Cache::UniqueHandle h(cache->Lookup(...)));
+  //     ...
+  //   } // 'h' is automatically released here
+  //
+  // Or:
+  //
+  //   unique_ptr<Cache> cache(NewCache(...));
+  //   ...
+  //   {
+  //     auto h(cache->Lookup(...)));
+  //     ...
+  //   } // 'h' is automatically released here
+  //
+  virtual UniqueHandle Lookup(const Slice& key, CacheBehavior caching) = 0;
+
+  // If the cache contains entry for key, erase it.  Note that the
+  // underlying entry will be kept around until all existing handles
+  // to it have been released.
+  virtual void Erase(const Slice& key) = 0;
+
+  // Return the value encapsulated in a raw handle returned by a successful
+  // Lookup().
+  virtual Slice Value(const UniqueHandle& handle) const = 0;
+
+
+  // ------------------------------------------------------------
+  // Insertion path
+  // ------------------------------------------------------------
+  //
+  // Because some cache implementations (eg NVM) manage their own memory, and because we'd
+  // like to read blocks directly into cache-managed memory rather than causing an extra
+  // memcpy, the insertion of a new element into the cache requires two phases. First, a
+  // PendingHandle is allocated with space for the value, and then it is later inserted.
+  //
+  // For example:
+  //
+  //   auto ph(cache_->Allocate("my entry", value_size, charge));
+  //   if (!ReadDataFromDisk(cache_->MutableValue(&ph)).ok()) {
+  //     ... error handling ...
+  //     return;
+  //   }
+  //   UniqueHandle h(cache_->Insert(std::move(ph), my_eviction_callback));
+  //   ...
+  //   // 'h' is automatically released.
+
+  // Indicates that the charge of an item in the cache should be calculated
+  // based on its memory consumption.
+  static constexpr int kAutomaticCharge = -1;
+
+  // Allocate space for a new entry to be inserted into the cache.
+  //
+  // The provided 'key' is copied into the resulting handle object.
+  // The allocated handle has enough space such that the value can
+  // be written into cache_->MutableValue(&handle).
+  //
+  // If 'charge' is not 'kAutomaticCharge', then the cache capacity will be charged
+  // the explicit amount. This is useful when caching items that are small but need to
+  // maintain a bounded count (eg file descriptors) rather than caring about their actual
+  // memory usage. It is also useful when caching items for whom calculating
+  // memory usage is a complex affair (i.e. items containing pointers to
+  // additional heap allocations).
+  //
+  // Note that this does not mutate the cache itself: lookups will
+  // not be able to find the provided key until it is inserted.
+  //
+  // It is possible that this will return a nullptr wrapped in a std::unique_ptr
+  // if the cache is above its capacity and eviction fails to free up enough
+  // space for the requested allocation.
+  //
+  // The returned handle owns the allocated memory.
+  virtual UniquePendingHandle Allocate(Slice key, int val_len, int charge) = 0;
+
+  // Default 'charge' should be kAutomaticCharge
+  // (default arguments on virtual functions are prohibited).
+  UniquePendingHandle Allocate(Slice key, int val_len) {
+    return Allocate(key, val_len, kAutomaticCharge);
+  }
+
+  virtual uint8_t* MutableValue(UniquePendingHandle* handle) = 0;
+
+  // Commit a prepared entry into the cache.
+  //
+  // Returns a handle that corresponds to the mapping. This method always
+  // succeeds and returns a non-null entry, since the space was reserved above.
+  //
+  // The 'pending' entry passed here should have been allocated using
+  // Cache::Allocate() above.
+  //
+  // If 'eviction_callback' is non-NULL, then it will be called when the
+  // entry is later evicted or when the cache shuts down.
+  virtual UniqueHandle Insert(UniquePendingHandle pending,
+                              EvictionCallback* eviction_callback) = 0;
+
+  // Forward declaration to simplify the layout of types/typedefs needed for the
+  // Invalidate() method while trying to adhere to the code style guide.
+  struct InvalidationControl;
+
+  // Invalidate cache's entries, effectively evicting non-valid ones from the
+  // cache. The invalidation process iterates over the cache's recency list(s),
+  // from best candidate for eviction to the worst.
+  //
+  // The provided control structure 'ctl' is responsible for the following:
+  //   * determine whether an entry is valid or not
+  //   * determine how to iterate over the entries in the cache's recency list
+  //
+  // NOTE: The invalidation process might hold a lock while iterating over
+  //       the cache's entries. Using proper IterationFunc might help to reduce
+  //       contention with the concurrent request for the cache's contents.
+  //       See the in-line documentation for IterationFunc for more details.
+  virtual size_t Invalidate(const InvalidationControl& ctl) = 0;
+
+  // Functor to define a criterion on a cache entry's validity. Upon call
+  // of Cache::Invalidate() method, if the functor returns 'false' for the
+  // specified key and value, the cache evicts the entry, otherwise the entry
+  // stays in the cache.
+  typedef std::function<bool(Slice /* key */,
+                             Slice /* value */)>
+      ValidityFunc;
+
+  // Functor to define whether to continue or stop iterating over the cache's
+  // entries based on the number of encountered invalid and valid entries
+  // during the Cache::Invalidate() call. If a cache contains multiple
+  // sub-caches (e.g., shards), those parameters are per sub-cache. For example,
+  // in case of multi-shard cache, when the 'iteration_func' returns 'false',
+  // the invalidation at current shard stops and switches to the next
+  // non-yet-processed shard, if any is present.
+  //
+  // The choice of the signature for the iteration functor is to allow for
+  // effective purging of non-valid (e.g., expired) entries in caches with
+  // the FIFO eviction policy (e.g., TTL caches).
+  //
+  // The first parameter of the functor is useful for short-circuiting
+  // the invalidation process once some valid entries have been encountered.
+  // For example, that's useful in case if the recency list has its entries
+  // ordered in FIFO-like order (e.g., TTL cache with FIFO eviction policy),
+  // so most-likely-invalid entries are in the very beginning of the list.
+  // In the latter case, once a valid (e.g., not yet expired) entry is
+  // encountered, there is no need to iterate any further: all the entries past
+  // the first valid one in the recency list should be valid as well.
+  //
+  // The second parameter is useful when the validity criterion is fuzzy,
+  // but there is a target number of entries to invalidate during each
+  // invocation of the Invalidate() method or there is some logic that reads
+  // the cache's metric(s) once the given number of entries have been evicted:
+  // e.g., compare the result memory footprint of the cache against a threshold
+  // to decide whether to continue invalidation of entries.
+  //
+  // Summing both parameters of the functor is useful when it's necessary to
+  // limit the number of entries processed per one invocation of the
+  // Invalidate() method. It makes sense in cases when a 'lazy' invalidation
+  // process is run by a periodic task along with a significant amount of
+  // concurrent requests to the cache, and the number of entries in the cache
+  // is huge. Given the fact that in most cases it's necessary to guard
+  // the access to the cache's recency list while iterating over it entries,
+  // limiting the number of entries to process at once allows for better control
+  // over the duration of the guarded/locked sections.
+  typedef std::function<bool(size_t /* valid_entries_num */,
+                             size_t /* invalid_entries_num */)>
+      IterationFunc;
+
+  // A helper function for 'validity_func' of the Invalidate() method:
+  // invalidate all entries.
+  static const ValidityFunc kInvalidateAllEntriesFunc;
+
+  // A helper function for 'iteration_func' of the Invalidate() method:
+  // examine all entries.
+  static const IterationFunc kIterateOverAllEntriesFunc;
+
+  // Control structure for the Invalidate() method. Combines the validity
+  // and the iteration functors.
+  struct InvalidationControl {
+    // NOLINTNEXTLINE(google-explicit-constructor)
+    InvalidationControl(ValidityFunc vfunctor = kInvalidateAllEntriesFunc,
+                        IterationFunc ifunctor = kIterateOverAllEntriesFunc)
+        : validity_func(std::move(vfunctor)),
+          iteration_func(std::move(ifunctor)) {
+    }
+    const ValidityFunc validity_func;
+    const IterationFunc iteration_func;
+  };
+
+ protected:
+  // Release a mapping returned by a previous Lookup(), using raw handle.
+  // REQUIRES: handle must not have been released yet.
+  // REQUIRES: handle must have been returned by a method on *this.
+  virtual void Release(Handle* handle) = 0;
+
+  // Free 'ptr', which must have been previously allocated using 'Allocate'.
+  virtual void Free(PendingHandle* ptr) = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Cache);
+};
+
+// A template helper function to instantiate a cache of particular
+// 'eviction_policy' flavor, backed by the given storage 'mem_type',
+// where 'capacity' specifies the capacity of the result cache,
+// and 'id' specifies its identifier.
+template<Cache::EvictionPolicy eviction_policy = Cache::EvictionPolicy::LRU,
+         Cache::MemoryType mem_type = Cache::MemoryType::DRAM>
+Cache* NewCache(size_t capacity, const std::string& id);
+
+// Create a new FIFO cache with a fixed size capacity. This implementation
+// of Cache uses the first-in-first-out eviction policy and stored in DRAM.
+template<>
+Cache* NewCache<Cache::EvictionPolicy::FIFO,
+                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id);
+
+// Create a new LRU cache with a fixed size capacity. This implementation
+// of Cache uses the least-recently-used eviction policy and stored in DRAM.
+template<>
+Cache* NewCache<Cache::EvictionPolicy::LRU,
+                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id);
+
+// A helper method to output cache memory type into ostream.
+std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type);
+
+} // namespace kudu
diff --git a/be/src/util/cache/nvm_cache.cc b/be/src/util/cache/nvm_cache.cc
new file mode 100644
index 0000000..d7860ac
--- /dev/null
+++ b/be/src/util/cache/nvm_cache.cc
@@ -0,0 +1,719 @@
+// This file is derived from cache.cc in the LevelDB project:
+//
+//   Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
+//   Use of this source code is governed by a BSD-style license that can be
+//   found in the LICENSE file.
+//
+// ------------------------------------------------------------
+// This file implements a cache based on the MEMKIND library (http://memkind.github.io/memkind/)
+// This library makes it easy to program against persistent memory hardware by exposing an API
+// which parallels malloc/free, but allocates from persistent memory instead of DRAM.
+//
+// We use this API to implement a cache which treats persistent memory or
+// non-volatile memory as if it were a larger cheaper bank of volatile memory. We
+// currently make no use of its persistence properties.
+//
+// Currently, we only store key/value in NVM. All other data structures such as the
+// ShardedLRUCache instances, hash table, etc are in DRAM. The assumption is that
+// the ratio of data stored vs overhead is quite high.
+
+#include "kudu/util/nvm_cache.h"
+
+#include <dlfcn.h>
+
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomic_refcount.h"
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/cache_metrics.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+#ifndef MEMKIND_PMEM_MIN_SIZE
+#define MEMKIND_PMEM_MIN_SIZE (1024 * 1024 * 16) // Taken from memkind 1.9.0.
+#endif
+
+struct memkind;
+
+// Useful in tests that require accurate cache capacity accounting.
+DECLARE_bool(cache_force_single_shard);
+
+DEFINE_string(nvm_cache_path, "/pmem",
+              "The path at which the NVM cache will try to allocate its memory. "
+              "This can be a tmpfs or ramfs for testing purposes.");
+
+DEFINE_bool(nvm_cache_simulate_allocation_failure, false,
+            "If true, the NVM cache will inject failures in calls to memkind_malloc "
+            "for testing.");
+TAG_FLAG(nvm_cache_simulate_allocation_failure, unsafe);
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+// Taken together, these typedefs and this macro make it easy to call a
+// memkind function:
+//
+//  CALL_MEMKIND(memkind_malloc, vmp_, size);
+typedef int (*memkind_create_pmem)(const char*, size_t, memkind**);
+typedef int (*memkind_destroy_kind)(memkind*);
+typedef void* (*memkind_malloc)(memkind*, size_t);
+typedef size_t (*memkind_malloc_usable_size)(memkind*, void*);
+typedef void (*memkind_free)(memkind*, void*);
+#define CALL_MEMKIND(func_name, ...) ((func_name)g_##func_name)(__VA_ARGS__)
+
+// Function pointers into memkind; set by InitMemkindOps().
+void* g_memkind_create_pmem;
+void* g_memkind_destroy_kind;
+void* g_memkind_malloc;
+void* g_memkind_malloc_usable_size;
+void* g_memkind_free;
+
+// After InitMemkindOps() is called, true if memkind is available and safe
+// to use, false otherwise.
+bool g_memkind_available;
+
+std::once_flag g_memkind_ops_flag;
+
+// Try to dlsym() a particular symbol from 'handle', storing the result in 'ptr'
+// if successful.
+Status TryDlsym(void* handle, const char* sym, void** ptr) {
+  dlerror(); // Need to clear any existing error first.
+  void* ret = dlsym(handle, sym);
+  char* error = dlerror();
+  if (error) {
+    return Status::NotSupported(Substitute("could not dlsym $0", sym), error);
+  }
+  *ptr = ret;
+  return Status::OK();
+}
+
+// Try to dlopen() memkind and set up all the function pointers we need from it.
+//
+// Note: in terms of protecting ourselves against changes in memkind, we'll
+// notice (and fail) if a symbol is missing, but not if it's signature has
+// changed or if there's some subtle behavioral change. A scan of the memkind
+// repo suggests that backwards compatibility is enforced: symbols are only
+// added and behavioral changes are effected via the introduction of new symbols.
+void InitMemkindOps() {
+  g_memkind_available = false;
+
+  // Use RTLD_NOW so that if any of memkind's dependencies aren't satisfied
+  // (e.g. libnuma is too old and is missing symbols), we'll know up front
+  // instead of during cache operations.
+  void* memkind_lib = dlopen("libmemkind.so.0", RTLD_NOW);
+  if (!memkind_lib) {
+    LOG(WARNING) << "could not dlopen: " << dlerror();
+    return;
+  }
+  auto cleanup = MakeScopedCleanup([&]() {
+    dlclose(memkind_lib);
+  });
+
+#define DLSYM_OR_RETURN(func_name, handle) do { \
+    const Status _s = TryDlsym(memkind_lib, func_name, handle); \
+    if (!_s.ok()) { \
+      LOG(WARNING) << _s.ToString(); \
+      return; \
+    } \
+  } while (0)
+
+  DLSYM_OR_RETURN("memkind_create_pmem", &g_memkind_create_pmem);
+  DLSYM_OR_RETURN("memkind_destroy_kind", &g_memkind_destroy_kind);
+  DLSYM_OR_RETURN("memkind_malloc", &g_memkind_malloc);
+  DLSYM_OR_RETURN("memkind_malloc_usable_size", &g_memkind_malloc_usable_size);
+  DLSYM_OR_RETURN("memkind_free", &g_memkind_free);
+#undef DLSYM_OR_RETURN
+
+  g_memkind_available = true;
+
+  // Need to keep the memkind library handle open so our function pointers
+  // remain loaded in memory.
+  cleanup.cancel();
+}
+
+typedef simple_spinlock MutexType;
+
+// LRU cache implementation
+
+// An entry is a variable length heap-allocated structure.  Entries
+// are kept in a circular doubly linked list ordered by access time.
+struct LRUHandle {
+  Cache::EvictionCallback* eviction_callback;
+  LRUHandle* next_hash;
+  LRUHandle* next;
+  LRUHandle* prev;
+  size_t charge;      // TODO(opt): Only allow uint32_t?
+  uint32_t key_length;
+  uint32_t val_length;
+  Atomic32 refs;
+  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
+  uint8_t* kv_data;
+
+  Slice key() const {
+    return Slice(kv_data, key_length);
+  }
+
+  Slice value() const {
+    return Slice(&kv_data[key_length], val_length);
+  }
+
+  uint8_t* val_ptr() {
+    return &kv_data[key_length];
+  }
+};
+
+// We provide our own simple hash table since it removes a whole bunch
+// of porting hacks and is also faster than some of the built-in hash
+// table implementations in some of the compiler/runtime combinations
+// we have tested.  E.g., readrandom speeds up by ~5% over the g++
+// 4.4.3's builtin hashtable.
+class HandleTable {
+ public:
+  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
+  ~HandleTable() { delete[] list_; }
+
+  LRUHandle* Lookup(const Slice& key, uint32_t hash) {
+    return *FindPointer(key, hash);
+  }
+
+  LRUHandle* Insert(LRUHandle* h) {
+    LRUHandle** ptr = FindPointer(h->key(), h->hash);
+    LRUHandle* old = *ptr;
+    h->next_hash = (old == nullptr ? nullptr : old->next_hash);
+    *ptr = h;
+    if (old == nullptr) {
+      ++elems_;
+      if (elems_ > length_) {
+        // Since each cache entry is fairly large, we aim for a small
+        // average linked list length (<= 1).
+        Resize();
+      }
+    }
+    return old;
+  }
+
+  LRUHandle* Remove(const Slice& key, uint32_t hash) {
+    LRUHandle** ptr = FindPointer(key, hash);
+    LRUHandle* result = *ptr;
+    if (result != nullptr) {
+      *ptr = result->next_hash;
+      --elems_;
+    }
+    return result;
+  }
+
+ private:
+  // The table consists of an array of buckets where each bucket is
+  // a linked list of cache entries that hash into the bucket.
+  uint32_t length_;
+  uint32_t elems_;
+  LRUHandle** list_;
+
+  // Return a pointer to slot that points to a cache entry that
+  // matches key/hash.  If there is no such cache entry, return a
+  // pointer to the trailing slot in the corresponding linked list.
+  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
+    LRUHandle** ptr = &list_[hash & (length_ - 1)];
+    while (*ptr != nullptr &&
+           ((*ptr)->hash != hash || key != (*ptr)->key())) {
+      ptr = &(*ptr)->next_hash;
+    }
+    return ptr;
+  }
+
+  void Resize() {
+    uint32_t new_length = 16;
+    while (new_length < elems_ * 1.5) {
+      new_length *= 2;
+    }
+    LRUHandle** new_list = new LRUHandle*[new_length];
+    memset(new_list, 0, sizeof(new_list[0]) * new_length);
+    uint32_t count = 0;
+    for (uint32_t i = 0; i < length_; i++) {
+      LRUHandle* h = list_[i];
+      while (h != nullptr) {
+        LRUHandle* next = h->next_hash;
+        uint32_t hash = h->hash;
+        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
+        h->next_hash = *ptr;
+        *ptr = h;
+        h = next;
+        count++;
+      }
+    }
+    DCHECK_EQ(elems_, count);
+    delete[] list_;
+    list_ = new_list;
+    length_ = new_length;
+  }
+};
+
+// A single shard of sharded cache.
+class NvmLRUCache {
+ public:
+  explicit NvmLRUCache(memkind *vmp);
+  ~NvmLRUCache();
+
+  // Separate from constructor so caller can easily make an array of LRUCache
+  void SetCapacity(size_t capacity) { capacity_ = capacity; }
+
+  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
+
+  Cache::Handle* Insert(LRUHandle* e, Cache::EvictionCallback* eviction_callback);
+
+  // Like Cache::Lookup, but with an extra "hash" parameter.
+  Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
+  void Release(Cache::Handle* handle);
+  void Erase(const Slice& key, uint32_t hash);
+  size_t Invalidate(const Cache::InvalidationControl& ctl);
+  void* Allocate(size_t size);
+
+ private:
+  void NvmLRU_Remove(LRUHandle* e);
+  void NvmLRU_Append(LRUHandle* e);
+  // Just reduce the reference count by 1.
+  // Return true if last reference
+  bool Unref(LRUHandle* e);
+  void FreeEntry(LRUHandle* e);
+
+  // Evict the LRU item in the cache, adding it to the linked list
+  // pointed to by 'to_remove_head'.
+  void EvictOldestUnlocked(LRUHandle** to_remove_head);
+
+  // Free all of the entries in the linked list that has to_free_head
+  // as its head.
+  void FreeLRUEntries(LRUHandle* to_free_head);
+
+  // Wrapper around memkind_malloc which injects failures based on a flag.
+  void* MemkindMalloc(size_t size);
+
+  // Initialized before use.
+  size_t capacity_;
+
+  // mutex_ protects the following state.
+  MutexType mutex_;
+  size_t usage_;
+
+  // Dummy head of LRU list.
+  // lru.prev is newest entry, lru.next is oldest entry.
+  LRUHandle lru_;
+
+  HandleTable table_;
+
+  memkind* vmp_;
+
+  CacheMetrics* metrics_;
+};
+
+NvmLRUCache::NvmLRUCache(memkind* vmp)
+  : usage_(0),
+  vmp_(vmp),
+  metrics_(nullptr) {
+  // Make empty circular linked list
+  lru_.next = &lru_;
+  lru_.prev = &lru_;
+}
+
+NvmLRUCache::~NvmLRUCache() {
+  for (LRUHandle* e = lru_.next; e != &lru_; ) {
+    LRUHandle* next = e->next;
+    DCHECK_EQ(e->refs, 1);  // Error if caller has an unreleased handle
+    if (Unref(e)) {
+      FreeEntry(e);
+    }
+    e = next;
+  }
+}
+
+void* NvmLRUCache::MemkindMalloc(size_t size) {
+  if (PREDICT_FALSE(FLAGS_nvm_cache_simulate_allocation_failure)) {
+    return nullptr;
+  }
+  return CALL_MEMKIND(memkind_malloc, vmp_, size);
+}
+
+bool NvmLRUCache::Unref(LRUHandle* e) {
+  DCHECK_GT(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
+  return !base::RefCountDec(&e->refs);
+}
+
+void NvmLRUCache::FreeEntry(LRUHandle* e) {
+  DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
+  if (e->eviction_callback) {
+    e->eviction_callback->EvictedEntry(e->key(), e->value());
+  }
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->DecrementBy(e->charge);
+    metrics_->evictions->Increment();
+  }
+  CALL_MEMKIND(memkind_free, vmp_, e);
+}
+
+// Allocate NVM memory.
+void* NvmLRUCache::Allocate(size_t size) {
+  return MemkindMalloc(size);
+}
+
+void NvmLRUCache::NvmLRU_Remove(LRUHandle* e) {
+  e->next->prev = e->prev;
+  e->prev->next = e->next;
+  usage_ -= e->charge;
+}
+
+void NvmLRUCache::NvmLRU_Append(LRUHandle* e) {
+  // Make "e" newest entry by inserting just before lru_
+  e->next = &lru_;
+  e->prev = lru_.prev;
+  e->prev->next = e;
+  e->next->prev = e;
+  usage_ += e->charge;
+}
+
+Cache::Handle* NvmLRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) {
+ LRUHandle* e;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = table_.Lookup(key, hash);
+    if (e != nullptr) {
+      // If an entry exists, remove the old entry from the cache
+      // and re-add to the end of the linked list.
+      base::RefCountInc(&e->refs);
+      NvmLRU_Remove(e);
+      NvmLRU_Append(e);
+    }
+  }
+
+  // Do the metrics outside of the lock.
+  if (metrics_) {
+    metrics_->lookups->Increment();
+    bool was_hit = (e != nullptr);
+    if (was_hit) {
+      if (caching) {
+        metrics_->cache_hits_caching->Increment();
+      } else {
+        metrics_->cache_hits->Increment();
+      }
+    } else {
+      if (caching) {
+        metrics_->cache_misses_caching->Increment();
+      } else {
+        metrics_->cache_misses->Increment();
+      }
+    }
+  }
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void NvmLRUCache::Release(Cache::Handle* handle) {
+  LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
+  bool last_reference = Unref(e);
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+
+void NvmLRUCache::EvictOldestUnlocked(LRUHandle** to_remove_head) {
+  LRUHandle* old = lru_.next;
+  NvmLRU_Remove(old);
+  table_.Remove(old->key(), old->hash);
+  if (Unref(old)) {
+    old->next = *to_remove_head;
+    *to_remove_head = old;
+  }
+}
+
+void NvmLRUCache::FreeLRUEntries(LRUHandle* to_free_head) {
+  while (to_free_head != nullptr) {
+    LRUHandle* next = to_free_head->next;
+    FreeEntry(to_free_head);
+    to_free_head = next;
+  }
+}
+
+Cache::Handle* NvmLRUCache::Insert(LRUHandle* e,
+                                   Cache::EvictionCallback* eviction_callback) {
+  DCHECK(e);
+  LRUHandle* to_remove_head = nullptr;
+
+  e->refs = 2;  // One from LRUCache, one for the returned handle
+  e->eviction_callback = eviction_callback;
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->IncrementBy(e->charge);
+    metrics_->inserts->Increment();
+  }
+
+  {
+    std::lock_guard<MutexType> l(mutex_);
+
+    NvmLRU_Append(e);
+
+    LRUHandle* old = table_.Insert(e);
+    if (old != nullptr) {
+      NvmLRU_Remove(old);
+      if (Unref(old)) {
+        old->next = to_remove_head;
+        to_remove_head = old;
+      }
+    }
+
+    while (usage_ > capacity_ && lru_.next != &lru_) {
+      EvictOldestUnlocked(&to_remove_head);
+    }
+  }
+
+  // we free the entries here outside of mutex for
+  // performance reasons
+  FreeLRUEntries(to_remove_head);
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void NvmLRUCache::Erase(const Slice& key, uint32_t hash) {
+  LRUHandle* e;
+  bool last_reference = false;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = table_.Remove(key, hash);
+    if (e != nullptr) {
+      NvmLRU_Remove(e);
+      last_reference = Unref(e);
+    }
+  }
+  // mutex not held here
+  // last_reference will only be true if e != nullptr
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+
+size_t NvmLRUCache::Invalidate(const Cache::InvalidationControl& ctl) {
+  size_t invalid_entry_count = 0;
+  size_t valid_entry_count = 0;
+  LRUHandle* to_remove_head = nullptr;
+
+  {
+    std::lock_guard<MutexType> l(mutex_);
+
+    // rl_.next is the oldest entry in the recency list.
+    LRUHandle* h = lru_.next;
+    while (h != nullptr && h != &lru_ &&
+           ctl.iteration_func(valid_entry_count, invalid_entry_count)) {
+      if (ctl.validity_func(h->key(), h->value())) {
+        // Continue iterating over the list.
+        h = h->next;
+        ++valid_entry_count;
+        continue;
+      }
+      // Copy the handle slated for removal.
+      LRUHandle* h_to_remove = h;
+      // Prepare for next iteration of the cycle.
+      h = h->next;
+
+      NvmLRU_Remove(h_to_remove);
+      table_.Remove(h_to_remove->key(), h_to_remove->hash);
+      if (Unref(h_to_remove)) {
+        h_to_remove->next = to_remove_head;
+        to_remove_head = h_to_remove;
+      }
+      ++invalid_entry_count;
+    }
+  }
+
+  FreeLRUEntries(to_remove_head);
+
+  return invalid_entry_count;
+}
+
+// Determine the number of bits of the hash that should be used to determine
+// the cache shard. This, in turn, determines the number of shards.
+int DetermineShardBits() {
+  int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
+      0 : Bits::Log2Ceiling(base::NumCPUs());
+  VLOG(1) << "Will use " << (1 << bits) << " shards for LRU cache.";
+  return bits;
+}
+
+class ShardedLRUCache : public Cache {
+ private:
+  unique_ptr<CacheMetrics> metrics_;
+  vector<NvmLRUCache*> shards_;
+
+  // Number of bits of hash used to determine the shard.
+  const int shard_bits_;
+
+  memkind* vmp_;
+
+  static inline uint32_t HashSlice(const Slice& s) {
+    return util_hash::CityHash64(
+      reinterpret_cast<const char *>(s.data()), s.size());
+  }
+
+  uint32_t Shard(uint32_t hash) {
+    // Widen to uint64 before shifting, or else on a single CPU,
+    // we would try to shift a uint32_t by 32 bits, which is undefined.
+    return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
+  }
+
+ public:
+  explicit ShardedLRUCache(size_t capacity, const string& /*id*/, memkind* vmp)
+      : shard_bits_(DetermineShardBits()),
+        vmp_(vmp) {
+    int num_shards = 1 << shard_bits_;
+    const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
+    for (int s = 0; s < num_shards; s++) {
+      gscoped_ptr<NvmLRUCache> shard(new NvmLRUCache(vmp_));
+      shard->SetCapacity(per_shard);
+      shards_.push_back(shard.release());
+    }
+  }
+
+  virtual ~ShardedLRUCache() {
+    STLDeleteElements(&shards_);
+    // Per the note at the top of this file, our cache is entirely volatile.
+    // Hence, when the cache is destructed, we delete the underlying
+    // memkind pool.
+    CALL_MEMKIND(memkind_destroy_kind, vmp_);
+  }
+
+  virtual UniqueHandle Insert(UniquePendingHandle handle,
+                              Cache::EvictionCallback* eviction_callback) OVERRIDE {
+    LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle.release()));
+    return UniqueHandle(
+        shards_[Shard(h->hash)]->Insert(h, eviction_callback),
+        Cache::HandleDeleter(this));
+  }
+  virtual UniqueHandle Lookup(const Slice& key, CacheBehavior caching) OVERRIDE {
+    const uint32_t hash = HashSlice(key);
+    return UniqueHandle(
+        shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE),
+        Cache::HandleDeleter(this));
+  }
+  virtual void Release(Handle* handle) OVERRIDE {
+    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
+    shards_[Shard(h->hash)]->Release(handle);
+  }
+  virtual void Erase(const Slice& key) OVERRIDE {
+    const uint32_t hash = HashSlice(key);
+    shards_[Shard(hash)]->Erase(key, hash);
+  }
+  virtual Slice Value(const UniqueHandle& handle) const OVERRIDE {
+    return reinterpret_cast<const LRUHandle*>(handle.get())->value();
+  }
+  virtual uint8_t* MutableValue(UniquePendingHandle* handle) OVERRIDE {
+    return reinterpret_cast<LRUHandle*>(handle->get())->val_ptr();
+  }
+
+  virtual void SetMetrics(unique_ptr<CacheMetrics> metrics) OVERRIDE {
+    metrics_ = std::move(metrics);
+    for (NvmLRUCache* cache : shards_) {
+      cache->SetMetrics(metrics_.get());
+    }
+  }
+  virtual UniquePendingHandle Allocate(Slice key, int val_len, int charge) OVERRIDE {
+    int key_len = key.size();
+    DCHECK_GE(key_len, 0);
+    DCHECK_GE(val_len, 0);
+
+    // Try allocating from each of the shards -- if memkind is tight,
+    // this can cause eviction, so we might have better luck in different
+    // shards.
+    for (NvmLRUCache* cache : shards_) {
+      UniquePendingHandle ph(static_cast<PendingHandle*>(
+          cache->Allocate(sizeof(LRUHandle) + key_len + val_len)),
+          Cache::PendingHandleDeleter(this));
+      if (ph) {
+        LRUHandle* handle = reinterpret_cast<LRUHandle*>(ph.get());
+        uint8_t* buf = reinterpret_cast<uint8_t*>(ph.get());
+        handle->kv_data = &buf[sizeof(LRUHandle)];
+        handle->val_length = val_len;
+        handle->key_length = key_len;
+        handle->charge = (charge == kAutomaticCharge) ?
+            CALL_MEMKIND(memkind_malloc_usable_size, vmp_, buf) : charge;
+        handle->hash = HashSlice(key);
+        memcpy(handle->kv_data, key.data(), key.size());
+        return ph;
+      }
+    }
+    // TODO(unknown): increment a metric here on allocation failure.
+    return UniquePendingHandle(nullptr, Cache::PendingHandleDeleter(this));
+  }
+
+  virtual void Free(PendingHandle* ph) OVERRIDE {
+    CALL_MEMKIND(memkind_free, vmp_, ph);
+  }
+
+  size_t Invalidate(const InvalidationControl& ctl) override {
+    size_t invalidated_count = 0;
+    for (auto& shard: shards_) {
+      invalidated_count += shard->Invalidate(ctl);
+    }
+    return invalidated_count;
+  }
+};
+
+} // end anonymous namespace
+
+template<>
+Cache* NewCache<Cache::EvictionPolicy::LRU,
+                Cache::MemoryType::NVM>(size_t capacity, const std::string& id) {
+  std::call_once(g_memkind_ops_flag, InitMemkindOps);
+
+  // TODO(adar): we should plumb the failure up the call stack, but at the time
+  // of writing the NVM cache is only usable by the block cache, and its use of
+  // the singleton pattern prevents the surfacing of errors.
+  CHECK(g_memkind_available) << "Memkind not available!";
+
+  // memkind_create_pmem() will fail if the capacity is too small, but with
+  // an inscrutable error. So, we'll check ourselves.
+  CHECK_GE(capacity, MEMKIND_PMEM_MIN_SIZE)
+    << "configured capacity " << capacity << " bytes is less than "
+    << "the minimum capacity for an NVM cache: " << MEMKIND_PMEM_MIN_SIZE;
+
+  memkind* vmp;
+  int err = CALL_MEMKIND(memkind_create_pmem, FLAGS_nvm_cache_path.c_str(), capacity, &vmp);
+  // If we cannot create the cache pool we should not retry.
+  PLOG_IF(FATAL, err) << "Could not initialize NVM cache library in path "
+                           << FLAGS_nvm_cache_path.c_str();
+
+  return new ShardedLRUCache(capacity, id, vmp);
+}
+
+bool CanUseNVMCacheForTests() {
+  std::call_once(g_memkind_ops_flag, InitMemkindOps);
+  return g_memkind_available;
+}
+
+} // namespace kudu
diff --git a/be/src/util/cache/nvm_cache.h b/be/src/util/cache/nvm_cache.h
new file mode 100644
index 0000000..d6c2762
--- /dev/null
+++ b/be/src/util/cache/nvm_cache.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstddef>
+#include <string>
+
+#include "kudu/util/cache.h"
+
+namespace kudu {
+
+// Convenience macro for invoking CanUseNVMCacheForTests.
+#define RETURN_IF_NO_NVM_CACHE(memory_type) do { \
+  if ((memory_type) == Cache::MemoryType::NVM && !CanUseNVMCacheForTests()) { \
+    LOG(WARNING) << "test is skipped; NVM cache cannot be created"; \
+    return; \
+  } \
+} while (0)
+
+// Returns true if an NVM cache can be created on this machine; false otherwise.
+//
+// Only intended for use in unit tests.
+bool CanUseNVMCacheForTests();
+
+// Create a new LRU cache with a fixed size capacity. This implementation
+// of Cache uses the least-recently-used eviction policy and stored in NVM.
+template<>
+Cache* NewCache<Cache::EvictionPolicy::LRU,
+                Cache::MemoryType::NVM>(size_t capacity, const std::string& id);
+
+}  // namespace kudu
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 6aef096..495fdcb 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -76,6 +76,10 @@ be/src/kudu/util/status.cc
 be/src/kudu/util/status.h
 be/src/kudu/security/x509_check_host.h
 be/src/kudu/security/x509_check_host.cc
+be/src/util/cache/cache.h
+be/src/util/cache/cache.cc
+be/src/util/cache/nvm_cache.cc
+be/src/util/cache/cache-test.cc
 
 # http://www.apache.org/legal/src-headers.html: "Short informational text files; for
 # example README, INSTALL files. The expectation is that these files make it obvious which