You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/03/31 17:09:27 UTC

[impala] branch master updated: IMPALA-8690: Add LIRS cache eviction algorithm

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 01691b9  IMPALA-8690: Add LIRS cache eviction algorithm
01691b9 is described below

commit 01691b998a96fd29575a3c3912377bd18fdd712f
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Mon Feb 24 09:54:04 2020 -0800

    IMPALA-8690: Add LIRS cache eviction algorithm
    
    One concern for the data cache is that the LRU eviction
    algorithm is suceptible to being flushed by large
    scans of low priority data. This implements the LIRS algorithm
    described in "LIRS: An Efficient Low Inter-reference Recency
    Set Replacement Policy to Improve Buffer Cache Performance"
    by Song Jiang / Xiaodon Xhang 2002. LIRS is a scan-resistent
    eviction algorithm with low performance penalty to LRU.
    
    This introduces the startup flag data_cache_eviction_policy to
    control which eviction policy to use. The only two options are
    LRU and LIRS, with the default continuing to be LRU.
    
    To accomodate the new algorithm and associated tests, some
    code moved around:
    1. The RLCacheShard implementation moved from util/cache/cache.cc
       to util/cache/rl-cache.cc.
    2. The backend cache tests were split into multiple files.
       util/cache/cache-test.h contains shared cache testing code.
       util/cache/cache-test.cc contains generic tests that should
       work for any algorithm.
       util/cache/rl-cache-test.cc are RLCacheShard specific tests
       util/cache/lirs-cache-test.cc are LIRS specific tests
    3. To make it easy for clients of the cache code to customize
       the cache eviction algorithm, the public interface changed
       from using a template to taking the policy as an argument.
    4. Cache::MemoryType is removed.
    5. Cache adds an Init() method to verify the validity of
       startup flags
    
    Testing:
     - Added LIRS specific backend cache tests (lirs-cache-test)
     - Ran TPC-DS with a very small cache and concurrency to test
       corner cases with the LIRS eviction policy
     - Parameterized data-cache-test to run for both LRU and LIRS
     - Added LIRS equivalents for tests in custom_cluster/test_data_cache.py
     - Ran cache-bench with LRU and LIRS. The results are:
       Test case           | Algorithm | Lookups / sec | Hit rate
       ZIPFIAN ratio=1.00x | LRU       | 11.31M        | 99.9%
       ZIPFIAN ratio=1.00x | LIRS      | 10.09M        | 99.8%
       ZIPFIAN ratio=3.00x | LRU       | 11.36M        | 95.9%
       ZIPFIAN ratio=3.00x | LIRS      |  9.27M        | 96.4%
       UNIFORM ratio=1.00x | LRU       |  7.46M        | 99.8%
       UNIFORM ratio=1.00x | LIRS      |  6.93M        | 99.8%
       UNIFORM ratio=3.00x | LRU       |  5.63M        | 33.3%
       UNIFORM ratio=3.00x | LIRS      |  3.24M        | 33.3%
       The takeaway is that LIRS is a bit slower on lookups and
       quite a bit slower on inserts. However, they both are still
       doing millions of operations per second, so it should not
       be a bottleneck for the data cache.
    
    Change-Id: I670fa4b2b7c93998130dc4e8b2546bb93e9a84f8
    Reviewed-on: http://gerrit.cloudera.org:8080/15306
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/data-cache-test.cc        |  96 ++-
 be/src/runtime/io/data-cache.cc             |  27 +-
 be/src/runtime/io/data-cache.h              |   2 +
 be/src/util/cache/CMakeLists.txt            |   8 +-
 be/src/util/cache/cache-bench.cc            |   9 +-
 be/src/util/cache/cache-internal.h          |  48 +-
 be/src/util/cache/cache-test.cc             | 382 ++---------
 be/src/util/cache/cache-test.h              | 107 +++
 be/src/util/cache/cache.cc                  | 394 +----------
 be/src/util/cache/cache.h                   |  90 +--
 be/src/util/cache/lirs-cache-test.cc        | 557 ++++++++++++++++
 be/src/util/cache/lirs-cache.cc             | 987 ++++++++++++++++++++++++++++
 be/src/util/cache/rl-cache-test.cc          | 258 ++++++++
 be/src/util/cache/{cache.cc => rl-cache.cc} | 136 ++--
 bin/rat_exclude_files.txt                   |   5 +-
 tests/custom_cluster/test_data_cache.py     | 112 +++-
 16 files changed, 2295 insertions(+), 923 deletions(-)

diff --git a/be/src/runtime/io/data-cache-test.cc b/be/src/runtime/io/data-cache-test.cc
index b335677..6ea4ca5 100644
--- a/be/src/runtime/io/data-cache-test.cc
+++ b/be/src/runtime/io/data-cache-test.cc
@@ -37,13 +37,21 @@
 #include "common/names.h"
 
 #define BASE_CACHE_DIR     "/tmp"
-#define DEFAULT_CACHE_SIZE (4 * 1024 * 1024)
 #define FNAME              ("foobar")
 #define NUM_TEST_DIRS      (16)
 #define NUM_THREADS        (18)
 #define MTIME              (12345)
 #define TEMP_BUFFER_SIZE   (4096)
 #define TEST_BUFFER_SIZE   (8192)
+#define NUM_CACHE_ENTRIES  (1024)
+#define DEFAULT_CACHE_SIZE (NUM_CACHE_ENTRIES * TEMP_BUFFER_SIZE)
+
+// The NUM_CACHE_ENTRIES_NO_EVICT says how many entries can be inserted into the cache
+// and queried without any evictions. This is smaller than NUM_CACHE_ENTRIES to
+// provide a bit of flexibility for cache implementations like LIRS that have multiple
+// segments. The segments may not split cleanly along entries of size TEMP_BUFFER_SIZE,
+// so it can evict something when adding NUM_CACHE_ENTRIES.
+#define NUM_CACHE_ENTRIES_NO_EVICT (NUM_CACHE_ENTRIES - 1)
 
 DECLARE_bool(cache_force_single_shard);
 DECLARE_bool(data_cache_anonymize_trace);
@@ -51,11 +59,12 @@ DECLARE_bool(data_cache_enable_tracing);
 DECLARE_int64(data_cache_file_max_size_bytes);
 DECLARE_int32(data_cache_max_opened_files);
 DECLARE_int32(data_cache_write_concurrency);
+DECLARE_string(data_cache_eviction_policy);
 
 namespace impala {
 namespace io {
 
-class DataCacheTest : public testing::Test {
+class DataCacheBaseTest : public testing::Test {
  public:
   const uint8_t* test_buffer() {
     return reinterpret_cast<const uint8_t*>(test_buffer_);
@@ -91,7 +100,7 @@ class DataCacheTest : public testing::Test {
       num_misses[i] = 0;
       string thread_name = Substitute("thread-$0", i);
       ASSERT_OK(Thread::Create("data-cache-test", thread_name,
-          boost::bind(&DataCacheTest::ThreadFn, this,
+          boost::bind(&DataCacheBaseTest::ThreadFn, this,
              use_per_thread_filename ? thread_name : "", cache, max_start_offset,
              &barrier, &num_misses[i]), &thread));
       threads.emplace_back(std::move(thread));
@@ -112,7 +121,7 @@ class DataCacheTest : public testing::Test {
   }
 
  protected:
-  DataCacheTest() {
+  DataCacheBaseTest() {
     // Create a buffer of random characters.
     for (int i = 0; i < TEST_BUFFER_SIZE; ++i) {
       test_buffer_[i] = '!' + (rand() % 93);
@@ -120,7 +129,7 @@ class DataCacheTest : public testing::Test {
   }
 
   // Create a bunch of test directories in which the data cache will reside.
-  virtual void SetUp() {
+  void SetupWithParameters(std::string eviction_policy) {
     test_env_.reset(new TestEnv());
     flag_saver_.reset(new google::FlagSaver());
     ASSERT_OK(test_env_->Init());
@@ -137,6 +146,9 @@ class DataCacheTest : public testing::Test {
 
     // Allows full write concurrency for the multi-threaded tests.
     FLAGS_data_cache_write_concurrency = NUM_THREADS;
+
+    // This is parameterized to allow testing LRU and LIRS
+    FLAGS_data_cache_eviction_policy = eviction_policy;
   }
 
   // Delete all the test directories created.
@@ -207,10 +219,27 @@ class DataCacheTest : public testing::Test {
   }
 };
 
+class DataCacheTest :
+    public DataCacheBaseTest,
+    public ::testing::WithParamInterface<std::string> {
+ public:
+  DataCacheTest()
+      : DataCacheBaseTest() {
+  }
+
+  void SetUp() override {
+    const auto& param = GetParam();
+    SetupWithParameters(param);
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(DataCacheTestTypes, DataCacheTest,
+    ::testing::Values("LRU", "LIRS"));
+
 // This test exercises the basic insertion and lookup paths by inserting a known set of
 // offsets which fit in the cache entirely. Also tries reading entries which are never
 // inserted into the cache.
-TEST_F(DataCacheTest, TestBasics) {
+TEST_P(DataCacheTest, TestBasics) {
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
   DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
   ASSERT_OK(cache.Init());
@@ -220,7 +249,7 @@ TEST_F(DataCacheTest, TestBasics) {
   // Read and then insert a range of offsets. Expected all misses in the first iteration
   // and all hits in the second iteration.
   for (int i = 0; i < 2; ++i) {
-    for (int64_t offset = 0; offset < 1024; ++offset) {
+    for (int64_t offset = 0; offset < NUM_CACHE_ENTRIES_NO_EVICT; ++offset) {
       int expected_bytes = i * TEMP_BUFFER_SIZE;
       memset(buffer, 0, TEMP_BUFFER_SIZE);
       ASSERT_EQ(expected_bytes,
@@ -235,24 +264,27 @@ TEST_F(DataCacheTest, TestBasics) {
   }
 
   // Read the same range inserted previously but with a different filename.
-  for (int64_t offset = 1024; offset < TEST_BUFFER_SIZE; ++offset) {
+  for (int64_t offset = NUM_CACHE_ENTRIES_NO_EVICT; offset < TEST_BUFFER_SIZE;
+       ++offset) {
     const string& alt_fname = "random";
     ASSERT_EQ(0, cache.Lookup(alt_fname, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
   }
 
   // Read the same range inserted previously but with a different mtime.
-  for (int64_t offset = 1024; offset < TEST_BUFFER_SIZE; ++offset) {
+  for (int64_t offset = NUM_CACHE_ENTRIES_NO_EVICT; offset < TEST_BUFFER_SIZE;
+       ++offset) {
     int64_t alt_mtime = 67890;
     ASSERT_EQ(0, cache.Lookup(FNAME, alt_mtime, offset, TEMP_BUFFER_SIZE, buffer));
   }
 
   // Read a range of offsets which should miss in the cache.
-  for (int64_t offset = 1024; offset < TEST_BUFFER_SIZE; ++offset) {
+  for (int64_t offset = NUM_CACHE_ENTRIES_NO_EVICT; offset < TEST_BUFFER_SIZE;
+       ++offset) {
     ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
   }
 
   // Read the same same range inserted previously. They should still all be in the cache.
-  for (int64_t offset = 0; offset < 1024; ++offset) {
+  for (int64_t offset = 0; offset < NUM_CACHE_ENTRIES_NO_EVICT; ++offset) {
     memset(buffer, 0, TEMP_BUFFER_SIZE);
     ASSERT_EQ(TEMP_BUFFER_SIZE,
         cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE + 10, buffer));
@@ -295,17 +327,17 @@ TEST_F(DataCacheTest, TestBasics) {
 
 // Tests backing file rotation by setting FLAGS_data_cache_file_max_size_bytes to be 1/4
 // of the cache size. This forces rotation of backing files.
-TEST_F(DataCacheTest, RotateFiles) {
+TEST_P(DataCacheTest, RotateFiles) {
   // Set the maximum size of backing files to be 1/4 of the cache size.
-  FLAGS_data_cache_file_max_size_bytes = 1024 * 1024;
-  const int64_t cache_size = 4 * FLAGS_data_cache_file_max_size_bytes ;
+  FLAGS_data_cache_file_max_size_bytes = DEFAULT_CACHE_SIZE / 4;
+  const int64_t cache_size = DEFAULT_CACHE_SIZE;
   DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
   ASSERT_OK(cache.Init());
 
   // Read and then insert a range of offsets. Expected all misses in the first iteration
   // and all hits in the second iteration.
   for (int i = 0; i < 2; ++i) {
-    for (int64_t offset = 0; offset < 1024; ++offset) {
+    for (int64_t offset = 0; offset < NUM_CACHE_ENTRIES_NO_EVICT; ++offset) {
       int expected_bytes = i * TEMP_BUFFER_SIZE;
       uint8_t buffer[TEMP_BUFFER_SIZE];
       memset(buffer, 0, TEMP_BUFFER_SIZE);
@@ -334,13 +366,13 @@ TEST_F(DataCacheTest, RotateFiles) {
 // of the cache size. This forces rotation of backing files. Also sets
 // --data_cache_max_opened_files to 1 so that only one underlying
 // file is allowed. This exercises the lazy deletion path.
-TEST_F(DataCacheTest, RotateAndDeleteFiles) {
+TEST_P(DataCacheTest, RotateAndDeleteFiles) {
   // Set the maximum size of backing files to be 1/4 of the cache size.
-  FLAGS_data_cache_file_max_size_bytes = 1024 * 1024;
+  FLAGS_data_cache_file_max_size_bytes = DEFAULT_CACHE_SIZE / 4;
   // Force to allow one backing file.
   FLAGS_data_cache_max_opened_files = 1;
 
-  const int64_t cache_size = 4 * FLAGS_data_cache_file_max_size_bytes ;
+  const int64_t cache_size = DEFAULT_CACHE_SIZE;
   DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
   ASSERT_OK(cache.Init());
 
@@ -360,12 +392,18 @@ TEST_F(DataCacheTest, RotateAndDeleteFiles) {
   // is still in the cache.
   int64_t in_cache_offset =
       1024 - FLAGS_data_cache_file_max_size_bytes / TEMP_BUFFER_SIZE;
+  int64_t num_entries_hit = 0;
   for (int64_t offset = in_cache_offset ; offset < 1024; ++offset) {
     memset(buffer, 0, TEMP_BUFFER_SIZE);
-    ASSERT_EQ(TEMP_BUFFER_SIZE, cache.Lookup(FNAME, MTIME, offset,
-        TEMP_BUFFER_SIZE, buffer));
-    ASSERT_EQ(0, memcmp(buffer, test_buffer() + offset, TEMP_BUFFER_SIZE));
+    int64_t lookup_size = cache.Lookup(FNAME, MTIME, offset,
+        TEMP_BUFFER_SIZE, buffer);
+    if (lookup_size != 0) {
+      ++num_entries_hit;
+      EXPECT_EQ(TEMP_BUFFER_SIZE, lookup_size);
+      EXPECT_EQ(0, memcmp(buffer, test_buffer() + offset, TEMP_BUFFER_SIZE));
+    }
   }
+  EXPECT_GE(num_entries_hit, NUM_CACHE_ENTRIES / 4 - 1);
 
   // Make sure only one backing file exists. Allow for 10 seconds latency for the
   // file deleter thread to run.
@@ -385,7 +423,9 @@ TEST_F(DataCacheTest, RotateAndDeleteFiles) {
 
 // Tests eviction in the cache by inserting a large entry which evicts all existing
 // entries in the cache.
-TEST_F(DataCacheTest, Eviction) {
+TEST_P(DataCacheTest, LRUEviction) {
+  // This test is specific to LRU
+  if (FLAGS_data_cache_eviction_policy != "LRU") return;
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
   DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
   ASSERT_OK(cache.Init());
@@ -438,12 +478,12 @@ TEST_F(DataCacheTest, Eviction) {
 // Tests insertion and lookup with the cache with multiple threads.
 // Inserts a working set which will fit in the cache. Despite potential
 // collision during insertion, all entries in the working set should be found.
-TEST_F(DataCacheTest, MultiThreadedNoMisses) {
+TEST_P(DataCacheTest, MultiThreadedNoMisses) {
   int64_t cache_size = DEFAULT_CACHE_SIZE;
   DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
   ASSERT_OK(cache.Init());
 
-  int64_t max_start_offset = 1024;
+  int64_t max_start_offset = NUM_CACHE_ENTRIES_NO_EVICT;
   bool use_per_thread_filename = false;
   bool expect_misses = false;
   MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
@@ -452,7 +492,7 @@ TEST_F(DataCacheTest, MultiThreadedNoMisses) {
 
 // Inserts a working set which is known to be larger than the cache's capacity.
 // Expect some cache misses in lookups.
-TEST_F(DataCacheTest, MultiThreadedWithMisses) {
+TEST_P(DataCacheTest, MultiThreadedWithMisses) {
   int64_t cache_size = DEFAULT_CACHE_SIZE;
   DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
   ASSERT_OK(cache.Init());
@@ -465,7 +505,7 @@ TEST_F(DataCacheTest, MultiThreadedWithMisses) {
 }
 
 // Test insertion and lookup with a cache configured with multiple partitions.
-TEST_F(DataCacheTest, MultiPartitions) {
+TEST_P(DataCacheTest, MultiPartitions) {
   StringPiece delimiter(",");
   string cache_base = JoinStrings(data_cache_dirs(), delimiter);
   const int64_t cache_size = DEFAULT_CACHE_SIZE;
@@ -482,7 +522,7 @@ TEST_F(DataCacheTest, MultiPartitions) {
 // Tests insertion of a working set whose size is 1/8 of the total memory size.
 // This likely exceeds the size of the page cache and forces write back of dirty pages in
 // the page cache to the backing files and also read from the backing files during lookup.
-TEST_F(DataCacheTest, LargeFootprint) {
+TEST_P(DataCacheTest, LargeFootprint) {
   struct sysinfo info;
   ASSERT_EQ(0, sysinfo(&info));
   ASSERT_GT(info.totalram, 0);
@@ -505,7 +545,7 @@ TEST_F(DataCacheTest, LargeFootprint) {
   }
 }
 
-TEST_F(DataCacheTest, TestAccessTrace) {
+TEST_P(DataCacheTest, TestAccessTrace) {
   FLAGS_data_cache_enable_tracing = true;
   for (bool anon : { false, true }) {
     SCOPED_TRACE(anon);
diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc
index 4f74566..71edb78 100644
--- a/be/src/runtime/io/data-cache.cc
+++ b/be/src/runtime/io/data-cache.cc
@@ -17,7 +17,6 @@
 
 #include "runtime/io/data-cache.h"
 
-#include <boost/algorithm/string.hpp>
 #include <errno.h>
 #include <fcntl.h>
 #include <mutex>
@@ -91,6 +90,10 @@ DEFINE_bool(data_cache_anonymize_trace, false,
     "(Advanced) Use hashes of filenames rather than file paths in the data "
     "cache access trace.");
 
+DEFINE_string(data_cache_eviction_policy, "LRU",
+    "(Advanced) The cache eviction policy to use for the data cache. "
+    "Either 'LRU' (default) or 'LIRS' (experimental)");
+
 namespace impala {
 namespace io {
 
@@ -440,14 +443,21 @@ struct DataCache::CacheKey {
   faststring key_;
 };
 
+static Cache::EvictionPolicy GetCacheEvictionPolicy(const std::string& policy_string) {
+  Cache::EvictionPolicy policy = Cache::ParseEvictionPolicy(policy_string);
+  if (policy != Cache::EvictionPolicy::LRU && policy != Cache::EvictionPolicy::LIRS) {
+    LOG(FATAL) << "Unsupported eviction policy: " << policy_string;
+  }
+  return policy;
+}
+
 DataCache::Partition::Partition(
     const string& path, int64_t capacity, int max_opened_files)
   : path_(path),
     capacity_(max<int64_t>(capacity, PAGE_SIZE)),
     max_opened_files_(max_opened_files),
-    meta_cache_(
-        NewCache<Cache::EvictionPolicy::LRU, Cache::MemoryType::DRAM>(
-            capacity_, path_)) {}
+    meta_cache_(NewCache(GetCacheEvictionPolicy(FLAGS_data_cache_eviction_policy),
+        capacity_, path_)) {}
 
 DataCache::Partition::~Partition() {
   if (!closed_) ReleaseResources();
@@ -482,6 +492,8 @@ Status DataCache::Partition::DeleteExistingFiles() const {
 Status DataCache::Partition::Init() {
   std::unique_lock<SpinLock> partition_lock(lock_);
 
+  RETURN_IF_ERROR(meta_cache_->Init());
+
   // Verify the validity of the path specified.
   if (!FileSystemUtil::IsCanonicalPath(path_)) {
     return Status(Substitute("$0 is not a canonical path", path_));
@@ -553,7 +565,7 @@ int64_t DataCache::Partition::Lookup(const CacheKey& cache_key, int64_t bytes_to
     uint8_t* buffer) {
   DCHECK(!closed_);
   Slice key = cache_key.ToSlice();
-  Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::EXPECT_IN_CACHE));
+  Cache::UniqueHandle handle(meta_cache_->Lookup(key));
 
   if (handle.get() == nullptr) {
     if (tracer_ != nullptr) {
@@ -629,7 +641,8 @@ bool DataCache::Partition::InsertIntoCache(const Slice& key, CacheFile* cache_fi
   CacheEntry entry(cache_file, insertion_offset, buffer_len, checksum);
   memcpy(meta_cache_->MutableValue(&pending_handle), &entry, sizeof(CacheEntry));
   Cache::UniqueHandle handle(meta_cache_->Insert(std::move(pending_handle), this));
-  pending_handle = nullptr;
+  // Check for failure of Insert
+  if (UNLIKELY(handle.get() == nullptr)) return false;
   ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES->Increment(charge_len);
   return true;
 }
@@ -644,7 +657,7 @@ bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffe
 
   // Check for existing entry.
   {
-    Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::EXPECT_IN_CACHE));
+    Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::NO_UPDATE));
     if (handle.get() != nullptr) {
       if (HandleExistingEntry(key, handle, buffer, buffer_len)) return false;
     }
diff --git a/be/src/runtime/io/data-cache.h b/be/src/runtime/io/data-cache.h
index 1dd3e38..beb83eb 100644
--- a/be/src/runtime/io/data-cache.h
+++ b/be/src/runtime/io/data-cache.h
@@ -179,6 +179,7 @@ class DataCache {
   Status CloseFilesAndVerifySizes();
 
  private:
+  friend class DataCacheBaseTest;
   friend class DataCacheTest;
   FRIEND_TEST(DataCacheTest, TestAccessTrace);
 
@@ -245,6 +246,7 @@ class DataCache {
     void DeleteOldFiles();
 
    private:
+    friend class DataCacheBaseTest;
     friend class DataCacheTest;
     FRIEND_TEST(DataCacheTest, TestAccessTrace);
     class Tracer;
diff --git a/be/src/util/cache/CMakeLists.txt b/be/src/util/cache/CMakeLists.txt
index cb3595c..7cc1999 100644
--- a/be/src/util/cache/CMakeLists.txt
+++ b/be/src/util/cache/CMakeLists.txt
@@ -23,15 +23,21 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/util/cache")
 
 add_library(UtilCache
   cache.cc
+  lirs-cache.cc
+  rl-cache.cc
 )
 add_dependencies(UtilCache gen-deps gen_ir_descriptions)
 
 add_library(UtilCacheTests STATIC
   cache-test.cc
+  lirs-cache-test.cc
+  rl-cache-test.cc
 )
 add_dependencies(UtilCacheTests gen-deps gen_ir_descriptions)
 
 add_executable(cache-bench cache-bench.cc)
 target_link_libraries(cache-bench ${IMPALA_TEST_LINK_LIBS})
 
-ADD_UNIFIED_BE_LSAN_TEST(cache-test "CacheTypes/CacheTest.*:CacheTypes/LRUCacheTest.*:FIFOCacheTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(cache-test "CacheTypes/CacheTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(lirs-cache-test "LIRSCacheTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(rl-cache-test "CacheTypes/CacheInvalidationTest.*:CacheTypes/LRUCacheTest.*:FIFOCacheTest.*")
diff --git a/be/src/util/cache/cache-bench.cc b/be/src/util/cache/cache-bench.cc
index ec26484..fe9cbde 100644
--- a/be/src/util/cache/cache-bench.cc
+++ b/be/src/util/cache/cache-bench.cc
@@ -29,6 +29,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "common/status.h"
 #include "kudu/gutil/bits.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/stringprintf.h"
@@ -42,6 +43,7 @@
 
 DEFINE_int32(num_threads, 16, "The number of threads to access the cache concurrently.");
 DEFINE_int32(run_seconds, 1, "The number of seconds to run the benchmark");
+DEFINE_string(eviction_policy, "LRU", "The eviction policy to use for the cache.");
 
 using std::atomic;
 using std::pair;
@@ -94,7 +96,10 @@ class CacheBench : public testing::Test,
                    public testing::WithParamInterface<BenchSetup>{
  public:
   void SetUp() override {
-    cache_.reset(NewCache(kCacheCapacity, "test-cache"));
+    cache_.reset(NewCache(Cache::ParseEvictionPolicy(FLAGS_eviction_policy),
+        kCacheCapacity, "test-cache"));
+    Status status = cache_->Init();
+    ASSERT_OK(status);
   }
 
   // Run queries against the cache until '*done' becomes true.
@@ -117,7 +122,7 @@ class CacheBench : public testing::Test,
       char key_buf[sizeof(int_key)];
       memcpy(key_buf, &int_key, sizeof(int_key));
       Slice key_slice(key_buf, arraysize(key_buf));
-      auto h(cache_->Lookup(key_slice, Cache::EXPECT_IN_CACHE));
+      auto h(cache_->Lookup(key_slice));
       if (h) {
         ++hits;
       } else {
diff --git a/be/src/util/cache/cache-internal.h b/be/src/util/cache/cache-internal.h
index a2bbacb..e46a97a 100644
--- a/be/src/util/cache/cache-internal.h
+++ b/be/src/util/cache/cache-internal.h
@@ -6,6 +6,7 @@
 
 #include <vector>
 
+#include "common/status.h"
 #include "kudu/gutil/bits.h"
 #include "kudu/gutil/hash/city.h"
 #include "kudu/gutil/stl_util.h"
@@ -50,6 +51,10 @@ class HandleBase {
     }
   }
 
+  ~HandleBase() {
+    DCHECK(next_handle_ == nullptr);
+  }
+
   Slice key() const {
     return Slice(kv_data_ptr_, key_length_);
   }
@@ -111,6 +116,8 @@ class HandleTable {
         // average linked list length (<= 1).
         Resize();
       }
+    } else {
+      old->next_handle_ = nullptr;
     }
     return old;
   }
@@ -120,6 +127,7 @@ class HandleTable {
     HandleBase* result = *ptr;
     if (result != nullptr) {
       *ptr = result->next_handle_;
+      result->next_handle_ = nullptr;
       --elems_;
     }
     return result;
@@ -152,7 +160,7 @@ class HandleTable {
     auto new_list = new HandleBase*[new_length];
     memset(new_list, 0, sizeof(new_list[0]) * new_length);
     uint32_t count = 0;
-    for (uint32_t i = 0; i < length_; i++) {
+    for (uint32_t i = 0; i < length_; ++i) {
       HandleBase* h = list_[i];
       while (h != nullptr) {
         HandleBase* next = h->next_handle_;
@@ -161,7 +169,7 @@ class HandleTable {
         h->next_handle_ = *ptr;
         *ptr = h;
         h = next;
-        count++;
+        ++count;
       }
     }
     DCHECK_EQ(elems_, count);
@@ -186,22 +194,30 @@ class CacheShard {
  public:
   virtual ~CacheShard() {}
 
-  // Separate from constructor so caller can easily make an array of CacheShard
-  virtual void SetCapacity(size_t capacity) = 0;
+  // Initialization function. Must be called before any other function.
+  virtual Status Init() = 0;
 
   virtual HandleBase* Allocate(Slice key, uint32_t hash, int val_len, int charge) = 0;
   virtual void Free(HandleBase* handle) = 0;
   virtual HandleBase* Insert(HandleBase* handle,
       Cache::EvictionCallback* eviction_callback) = 0;
-  virtual HandleBase* Lookup(const Slice& key, uint32_t hash, bool caching) = 0;
+  virtual HandleBase* Lookup(const Slice& key, uint32_t hash, bool no_updates) = 0;
   virtual void Release(HandleBase* handle) = 0;
   virtual void Erase(const Slice& key, uint32_t hash) = 0;
   virtual size_t Invalidate(const Cache::InvalidationControl& ctl) = 0;
 };
 
 // Function to build a cache shard using the given eviction algorithm.
+// This untemplatized function allows ShardedCache to avoid templating and
+// keeps the templates internal to the cache.
+CacheShard* NewCacheShard(Cache::EvictionPolicy eviction_policy,
+    kudu::MemTracker* mem_tracker, size_t capacity);
+
+// Internal templatized function to build a cache shard for the given eviction
+// algorithm. The template argument allows the different eviction policy implementations
+// to be defined in different files (e.g. LRU in rl-cache.cc and LIRS in lirs-cache.cc).
 template <Cache::EvictionPolicy eviction_policy>
-CacheShard* NewCacheShard(kudu::MemTracker* mem_tracker);
+CacheShard* NewCacheShardInt(kudu::MemTracker* mem_tracker, size_t capacity);
 
 // Determine the number of bits of the hash that should be used to determine
 // the cache shard. This, in turn, determines the number of shards.
@@ -214,10 +230,9 @@ string ToString(Cache::EvictionPolicy p);
 // through to the underlying CacheShard unless the function can be answered by the
 // HandleBase itself. The number of shards is currently a power of 2 so that it can
 // do a right shift to get the shard index.
-template <Cache::EvictionPolicy policy>
 class ShardedCache : public Cache {
  public:
-  explicit ShardedCache(size_t capacity, const string& id)
+  explicit ShardedCache(Cache::EvictionPolicy policy, size_t capacity, const string& id)
       : shard_bits_(DetermineShardBits()) {
     // A cache is often a singleton, so:
     // 1. We reuse its MemTracker if one already exists, and
@@ -229,10 +244,8 @@ class ShardedCache : public Cache {
 
     int num_shards = 1 << shard_bits_;
     const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
-    for (int s = 0; s < num_shards; s++) {
-      unique_ptr<CacheShard> shard(NewCacheShard<policy>(mem_tracker_.get()));
-      shard->SetCapacity(per_shard);
-      shards_.push_back(shard.release());
+    for (int s = 0; s < num_shards; ++s) {
+      shards_.push_back(NewCacheShard(policy, mem_tracker_.get(), per_shard));
     }
   }
 
@@ -240,9 +253,16 @@ class ShardedCache : public Cache {
     STLDeleteElements(&shards_);
   }
 
-  UniqueHandle Lookup(const Slice& key, CacheBehavior caching) override {
+  Status Init() override {
+    for (CacheShard* shard : shards_) {
+      RETURN_IF_ERROR(shard->Init());
+    }
+    return Status::OK();
+  }
+
+  UniqueHandle Lookup(const Slice& key, LookupBehavior behavior) override {
     const uint32_t hash = HashSlice(key);
-    HandleBase* h = shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE);
+    HandleBase* h = shards_[Shard(hash)]->Lookup(key, hash, behavior == NO_UPDATE);
     return UniqueHandle(reinterpret_cast<Cache::Handle*>(h), Cache::HandleDeleter(this));
   }
 
diff --git a/be/src/util/cache/cache-test.cc b/be/src/util/cache/cache-test.cc
index 265b0b9..708cc57 100644
--- a/be/src/util/cache/cache-test.cc
+++ b/be/src/util/cache/cache-test.cc
@@ -3,27 +3,19 @@
 // found in the LICENSE file.
 
 #include "util/cache/cache.h"
+#include "util/cache/cache-test.h"
 
-#include <cstring>
 #include <memory>
-#include <string>
+#include <tuple>
 #include <utility>
-#include <vector>
 
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/coding.h"
-#include "kudu/util/env.h"
-#include "kudu/util/faststring.h"
 #include "kudu/util/mem_tracker.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/slice.h"
+#include "testutil/gtest-util.h"
 
 DECLARE_bool(cache_force_single_shard);
 
@@ -31,124 +23,44 @@ DECLARE_double(cache_memtracker_approximation_ratio);
 
 using std::make_tuple;
 using std::tuple;
-using std::shared_ptr;
-using std::unique_ptr;
-using std::vector;
-using strings::Substitute;
 
 namespace impala {
 
-// Conversions between numeric keys/values and the types expected by Cache.
-static std::string EncodeInt(int k) {
-  kudu::faststring result;
-  kudu::PutFixed32(&result, k);
-  return result.ToString();
+void CacheBaseTest::SetupWithParameters(Cache::EvictionPolicy eviction_policy,
+    ShardingPolicy sharding_policy) {
+  // Disable approximate tracking of cache memory since we make specific
+  // assertions on the MemTracker in this test.
+  FLAGS_cache_memtracker_approximation_ratio = 0;
+
+  // Using single shard makes the logic of scenarios simple for capacity-
+  // and eviction-related behavior.
+  FLAGS_cache_force_single_shard =
+    (sharding_policy == ShardingPolicy::SingleShard);
+
+  switch (eviction_policy) {
+    case Cache::EvictionPolicy::FIFO:
+      cache_.reset(NewCache(Cache::EvictionPolicy::FIFO, cache_size(), "cache_test"));
+      kudu::MemTracker::FindTracker("cache_test-sharded_fifo_cache", &mem_tracker_);
+      break;
+    case Cache::EvictionPolicy::LRU:
+      cache_.reset(NewCache(Cache::EvictionPolicy::LRU, cache_size(), "cache_test"));
+      kudu::MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_);
+      break;
+    case Cache::EvictionPolicy::LIRS:
+      cache_.reset(NewCache(Cache::EvictionPolicy::LIRS, cache_size(), "cache_test"));
+      kudu::MemTracker::FindTracker("cache_test-sharded_lirs_cache", &mem_tracker_);
+      break;
+    default:
+      FAIL() << "unrecognized cache eviction policy";
+  }
+  ASSERT_TRUE(mem_tracker_.get());
+  Status status = cache_->Init();
+  ASSERT_OK(status);
 }
-static int DecodeInt(const Slice& k) {
-  CHECK_EQ(4, k.size());
-  return kudu::DecodeFixed32(k.data());
-}
-
-// Cache sharding policy affects the composition of the cache. Some test
-// scenarios assume cache is single-sharded to keep the logic simpler.
-enum class ShardingPolicy {
-  MultiShard,
-  SingleShard,
-};
-
-class CacheBaseTest : public ::testing::Test,
-                      public Cache::EvictionCallback {
- public:
-  explicit CacheBaseTest(size_t cache_size)
-      : cache_size_(cache_size) {
-  }
-
-  size_t cache_size() const {
-    return cache_size_;
-  }
-
-  // Implementation of the EvictionCallback interface.
-  void EvictedEntry(Slice key, Slice val) override {
-    evicted_keys_.push_back(DecodeInt(key));
-    evicted_values_.push_back(DecodeInt(val));
-  }
-
-  int Lookup(int key) {
-    auto handle(cache_->Lookup(EncodeInt(key), Cache::EXPECT_IN_CACHE));
-    return handle ? DecodeInt(cache_->Value(handle)) : -1;
-  }
-
-  void Insert(int key, int value, int charge = 1) {
-    std::string key_str = EncodeInt(key);
-    std::string val_str = EncodeInt(value);
-    auto handle(cache_->Allocate(key_str, val_str.size(), charge));
-    CHECK(handle);
-    memcpy(cache_->MutableValue(&handle), val_str.data(), val_str.size());
-    cache_->Insert(std::move(handle), this);
-  }
-
-  void Erase(int key) {
-    cache_->Erase(EncodeInt(key));
-  }
-
- protected:
-  void SetupWithParameters(Cache::MemoryType mem_type,
-                           Cache::EvictionPolicy eviction_policy,
-                           ShardingPolicy sharding_policy) {
-    // Disable approximate tracking of cache memory since we make specific
-    // assertions on the MemTracker in this test.
-    FLAGS_cache_memtracker_approximation_ratio = 0;
-
-    // Using single shard makes the logic of scenarios simple for capacity-
-    // and eviction-related behavior.
-    FLAGS_cache_force_single_shard =
-        (sharding_policy == ShardingPolicy::SingleShard);
-
-    switch (eviction_policy) {
-      case Cache::EvictionPolicy::FIFO:
-        if (mem_type != Cache::MemoryType::DRAM) {
-          FAIL() << "FIFO cache can only be of DRAM type";
-        }
-        cache_.reset(NewCache<Cache::EvictionPolicy::FIFO,
-                              Cache::MemoryType::DRAM>(cache_size(),
-                                                       "cache_test"));
-        kudu::MemTracker::FindTracker("cache_test-sharded_fifo_cache", &mem_tracker_);
-        break;
-      case Cache::EvictionPolicy::LRU:
-        switch (mem_type) {
-          case Cache::MemoryType::DRAM:
-            cache_.reset(NewCache<Cache::EvictionPolicy::LRU,
-                                  Cache::MemoryType::DRAM>(cache_size(),
-                                                           "cache_test"));
-            break;
-          default:
-            FAIL() << mem_type << ": unrecognized cache memory type";
-        }
-        kudu::MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_);
-        break;
-      default:
-        FAIL() << "unrecognized cache eviction policy";
-    }
-
-    // Since nvm cache does not have memtracker due to the use of
-    // tcmalloc for this we only check for it in the DRAM case.
-    if (mem_type == Cache::MemoryType::DRAM) {
-      ASSERT_TRUE(mem_tracker_.get());
-    }
-  }
-
-  const size_t cache_size_;
-  vector<int> evicted_keys_;
-  vector<int> evicted_values_;
-  shared_ptr<kudu::MemTracker> mem_tracker_;
-  unique_ptr<Cache> cache_;
-};
 
 class CacheTest :
     public CacheBaseTest,
-    public ::testing::WithParamInterface<tuple<Cache::MemoryType,
-                                               Cache::EvictionPolicy,
-                                               ShardingPolicy>> {
+    public ::testing::WithParamInterface<tuple<Cache::EvictionPolicy, ShardingPolicy>> {
  public:
   CacheTest()
       : CacheBaseTest(16 * 1024 * 1024) {
@@ -157,25 +69,22 @@ class CacheTest :
   void SetUp() override {
     const auto& param = GetParam();
     SetupWithParameters(std::get<0>(param),
-                        std::get<1>(param),
-                        std::get<2>(param));
+                        std::get<1>(param));
   }
 };
 
 INSTANTIATE_TEST_CASE_P(
     CacheTypes, CacheTest,
     ::testing::Values(
-        make_tuple(Cache::MemoryType::DRAM,
-                   Cache::EvictionPolicy::FIFO,
+        make_tuple(Cache::EvictionPolicy::FIFO,
                    ShardingPolicy::MultiShard),
-        make_tuple(Cache::MemoryType::DRAM,
-                   Cache::EvictionPolicy::FIFO,
+        make_tuple(Cache::EvictionPolicy::FIFO,
                    ShardingPolicy::SingleShard),
-        make_tuple(Cache::MemoryType::DRAM,
-                   Cache::EvictionPolicy::LRU,
+        make_tuple(Cache::EvictionPolicy::LRU,
                    ShardingPolicy::MultiShard),
-        make_tuple(Cache::MemoryType::DRAM,
-                   Cache::EvictionPolicy::LRU,
+        make_tuple(Cache::EvictionPolicy::LRU,
+                   ShardingPolicy::SingleShard),
+        make_tuple(Cache::EvictionPolicy::LIRS,
                    ShardingPolicy::SingleShard)));
 
 TEST_P(CacheTest, TrackMemory) {
@@ -232,11 +141,11 @@ TEST_P(CacheTest, Erase) {
 
 TEST_P(CacheTest, EntriesArePinned) {
   Insert(100, 101);
-  auto h1 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+  auto h1 = cache_->Lookup(EncodeInt(100));
   ASSERT_EQ(101, DecodeInt(cache_->Value(h1)));
 
   Insert(100, 102);
-  auto h2 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+  auto h2 = cache_->Lookup(EncodeInt(100));
   ASSERT_EQ(102, DecodeInt(cache_->Value(h2)));
   ASSERT_EQ(0, evicted_keys_.size());
 
@@ -267,11 +176,11 @@ TEST_P(CacheTest, HeavyEntries) {
     const int weight = (index & 1) ? kLight : kHeavy;
     Insert(index, 1000+index, weight);
     added += weight;
-    index++;
+    ++index;
   }
 
   int cached_weight = 0;
-  for (int i = 0; i < index; i++) {
+  for (int i = 0; i < index; ++i) {
     const int weight = (i & 1 ? kLight : kHeavy);
     int r = Lookup(i);
     if (r >= 0) {
@@ -282,206 +191,5 @@ TEST_P(CacheTest, HeavyEntries) {
   ASSERT_LE(cached_weight, cache_size() + cache_size() / 10);
 }
 
-TEST_P(CacheTest, InvalidateAllEntries) {
-  constexpr const int kEntriesNum = 1024;
-  // This scenarios assumes no evictions are done at the cache capacity.
-  ASSERT_LE(kEntriesNum, cache_size());
-
-  // Running invalidation on empty cache should yield no invalidated entries.
-  ASSERT_EQ(0, cache_->Invalidate({}));
-  for (auto i = 0; i < kEntriesNum; ++i) {
-    Insert(i, i);
-  }
-  // Remove a few entries from the cache (sparse pattern of keys).
-  constexpr const int kSparseKeys[] = {1, 100, 101, 500, 501, 512, 999, 1001};
-  for (const auto key : kSparseKeys) {
-    Erase(key);
-  }
-  ASSERT_EQ(ARRAYSIZE(kSparseKeys), evicted_keys_.size());
-
-  // All inserted entries, except for the removed one, should be invalidated.
-  ASSERT_EQ(kEntriesNum - ARRAYSIZE(kSparseKeys), cache_->Invalidate({}));
-  // In the end, no entries should be left in the cache.
-  ASSERT_EQ(kEntriesNum, evicted_keys_.size());
-}
-
-TEST_P(CacheTest, InvalidateNoEntries) {
-  constexpr const int kEntriesNum = 10;
-  // This scenarios assumes no evictions are done at the cache capacity.
-  ASSERT_LE(kEntriesNum, cache_size());
-
-  const Cache::ValidityFunc func = [](Slice /* key */, Slice /* value */) {
-    return true;
-  };
-  // Running invalidation on empty cache should yield no invalidated entries.
-  ASSERT_EQ(0, cache_->Invalidate({ func }));
-
-  for (auto i = 0; i < kEntriesNum; ++i) {
-    Insert(i, i);
-  }
-
-  // No entries should be invalidated since the validity function considers
-  // all entries valid.
-  ASSERT_EQ(0, cache_->Invalidate({ func }));
-  ASSERT_TRUE(evicted_keys_.empty());
-}
-
-TEST_P(CacheTest, InvalidateNoEntriesNoAdvanceIterationFunctor) {
-  constexpr const int kEntriesNum = 256;
-  // This scenarios assumes no evictions are done at the cache capacity.
-  ASSERT_LE(kEntriesNum, cache_size());
-
-  const Cache::InvalidationControl ctl = {
-    Cache::kInvalidateAllEntriesFunc,
-    [](size_t /* valid_entries_count */, size_t /* invalid_entries_count */) {
-      // Never advance over the item list.
-      return false;
-    }
-  };
-
-  // Running invalidation on empty cache should yield no invalidated entries.
-  ASSERT_EQ(0, cache_->Invalidate(ctl));
-
-  for (auto i = 0; i < kEntriesNum; ++i) {
-    Insert(i, i);
-  }
-
-  // No entries should be invalidated since the iteration functor doesn't
-  // advance over the list of entries, even if every entry is declared invalid.
-  ASSERT_EQ(0, cache_->Invalidate(ctl));
-  // In the end, all entries should be in the cache.
-  ASSERT_EQ(0, evicted_keys_.size());
-}
-
-TEST_P(CacheTest, InvalidateOddKeyEntries) {
-  constexpr const int kEntriesNum = 64;
-  // This scenarios assumes no evictions are done at the cache capacity.
-  ASSERT_LE(kEntriesNum, cache_size());
-
-  const Cache::ValidityFunc func = [](Slice key, Slice /* value */) {
-    return DecodeInt(key) % 2 == 0;
-  };
-  // Running invalidation on empty cache should yield no invalidated entries.
-  ASSERT_EQ(0, cache_->Invalidate({ func }));
-
-  for (auto i = 0; i < kEntriesNum; ++i) {
-    Insert(i, i);
-  }
-  ASSERT_EQ(kEntriesNum / 2, cache_->Invalidate({ func }));
-  ASSERT_EQ(kEntriesNum / 2, evicted_keys_.size());
-  for (auto i = 0; i < kEntriesNum; ++i) {
-    if (i % 2 == 0) {
-      ASSERT_EQ(i,  Lookup(i));
-    } else {
-      ASSERT_EQ(-1,  Lookup(i));
-    }
-  }
-}
-
-// This class is dedicated for scenarios specific for FIFOCache.
-// The scenarios use a single-shard cache for simpler logic.
-class FIFOCacheTest : public CacheBaseTest {
- public:
-  FIFOCacheTest()
-      : CacheBaseTest(10 * 1024) {
-  }
-
-  void SetUp() override {
-    SetupWithParameters(Cache::MemoryType::DRAM,
-                        Cache::EvictionPolicy::FIFO,
-                        ShardingPolicy::SingleShard);
-  }
-};
-
-// Verify how the eviction behavior of a FIFO cache.
-TEST_F(FIFOCacheTest, EvictionPolicy) {
-  static constexpr int kNumElems = 20;
-  const int size_per_elem = cache_size() / kNumElems;
-  // First data chunk: fill the cache up to the capacity.
-  int idx = 0;
-  do {
-    Insert(idx, idx, size_per_elem);
-    // Keep looking up the very first entry: this is to make sure lookups
-    // do not affect the recency criteria of the eviction policy for FIFO cache.
-    Lookup(0);
-    ++idx;
-  } while (evicted_keys_.empty());
-  ASSERT_GT(idx, 1);
-
-  // Make sure the earliest inserted entry was evicted.
-  ASSERT_EQ(-1, Lookup(0));
-
-  // Verify that the 'empirical' capacity matches the expected capacity
-  // (it's a single-shard cache).
-  const int capacity = idx - 1;
-  ASSERT_EQ(kNumElems, capacity);
-
-  // Second data chunk: add (capacity / 2) more elements.
-  for (int i = 1; i < capacity / 2; ++i) {
-    // Earlier inserted elements should be gone one-by-one as new elements are
-    // inserted, and lookups should not affect the recency criteria of the FIFO
-    // eviction policy.
-    ASSERT_EQ(i, Lookup(i));
-    Insert(capacity + i, capacity + i, size_per_elem);
-    ASSERT_EQ(capacity + i, Lookup(capacity + i));
-    ASSERT_EQ(-1, Lookup(i));
-  }
-  ASSERT_EQ(capacity / 2, evicted_keys_.size());
-
-  // Early inserted elements from the first chunk should be evicted
-  // to accommodate the elements from the second chunk.
-  for (int i = 0; i < capacity / 2; ++i) {
-    SCOPED_TRACE(Substitute("early inserted elements: index $0", i));
-    ASSERT_EQ(-1, Lookup(i));
-  }
-  // The later inserted elements from the first chunk should be still
-  // in the cache.
-  for (int i = capacity / 2; i < capacity; ++i) {
-    SCOPED_TRACE(Substitute("late inserted elements: index $0", i));
-    ASSERT_EQ(i, Lookup(i));
-  }
-}
-
-class LRUCacheTest :
-    public CacheBaseTest,
-    public ::testing::WithParamInterface<ShardingPolicy> {
- public:
-  LRUCacheTest()
-      : CacheBaseTest(16 * 1024 * 1024) {
-  }
-
-  void SetUp() override {
-    const auto& param = GetParam();
-    SetupWithParameters(Cache::MemoryType::DRAM,
-                        Cache::EvictionPolicy::LRU,
-                        param);
-  }
-};
-
-INSTANTIATE_TEST_CASE_P(
-    CacheTypes, LRUCacheTest,
-    ::testing::Values(ShardingPolicy::MultiShard,
-                      ShardingPolicy::SingleShard));
-
-TEST_P(LRUCacheTest, EvictionPolicy) {
-  static constexpr int kNumElems = 1000;
-  const int size_per_elem = cache_size() / kNumElems;
-
-  Insert(100, 101);
-  Insert(200, 201);
-
-  // Loop adding and looking up new entries, but repeatedly accessing key 101.
-  // This frequently-used entry should not be evicted.
-  for (int i = 0; i < kNumElems + 1000; i++) {
-    Insert(1000+i, 2000+i, size_per_elem);
-    ASSERT_EQ(2000+i, Lookup(1000+i));
-    ASSERT_EQ(101, Lookup(100));
-  }
-  ASSERT_EQ(101, Lookup(100));
-  // Since '200' wasn't accessed in the loop above, it should have
-  // been evicted.
-  ASSERT_EQ(-1, Lookup(200));
-}
-
 }  // namespace impala
 
diff --git a/be/src/util/cache/cache-test.h b/be/src/util/cache/cache-test.h
new file mode 100644
index 0000000..279c2f9
--- /dev/null
+++ b/be/src/util/cache/cache-test.h
@@ -0,0 +1,107 @@
+// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "util/cache/cache.h"
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/coding.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/slice.h"
+
+namespace impala {
+
+// Conversions between numeric keys/values and the types expected by Cache.
+static std::string EncodeInt(int k) {
+  kudu::faststring result;
+  kudu::PutFixed32(&result, k);
+  return result.ToString();
+}
+static int DecodeInt(const Slice& k) {
+  CHECK_EQ(4, k.size());
+  return kudu::DecodeFixed32(k.data());
+}
+
+// Cache sharding policy affects the composition of the cache. Some test
+// scenarios assume cache is single-sharded to keep the logic simpler.
+enum class ShardingPolicy {
+  MultiShard,
+  SingleShard,
+};
+
+// CacheBaseTest provides a base test class for testing a variety of different cache
+// eviction policies (LRU, LIRS, FIFO). It wraps the common functions to simplify the
+// calls for tests and to use integer keys and values, so that tests can avoid using
+// Slice types directly. It also maintains a record of all evictions that take place,
+// both the keys and the values.
+// NOTE: The structures are not synchronized, so this is only useful for single-threaded
+// tests.
+class CacheBaseTest : public ::testing::Test,
+                      public Cache::EvictionCallback {
+ public:
+  explicit CacheBaseTest(size_t cache_size)
+      : cache_size_(cache_size) {
+  }
+
+  size_t cache_size() const {
+    return cache_size_;
+  }
+
+  // Implementation of the EvictionCallback interface. This maintains a record of all
+  // evictions, keys and values.
+  void EvictedEntry(Slice key, Slice val) override {
+    evicted_keys_.push_back(DecodeInt(key));
+    evicted_values_.push_back(DecodeInt(val));
+  }
+
+  // Lookup the key. Return the value on success or -1 on failure.
+  int Lookup(int key, Cache::LookupBehavior behavior = Cache::NORMAL) {
+    auto handle(cache_->Lookup(EncodeInt(key), behavior));
+    return handle ? DecodeInt(cache_->Value(handle)) : -1;
+  }
+
+  // Insert a key with the give value and charge. Return whether the insert was
+  // successful.
+  bool Insert(int key, int value, int charge = 1) {
+    std::string key_str = EncodeInt(key);
+    std::string val_str = EncodeInt(value);
+    auto handle(cache_->Allocate(key_str, val_str.size(), charge));
+    // This can be null if Allocate rejects something with this charge.
+    if (handle == nullptr) return false;
+    memcpy(cache_->MutableValue(&handle), val_str.data(), val_str.size());
+    auto inserted_handle(cache_->Insert(std::move(handle), this));
+    // Insert can fail and return nullptr
+    if (inserted_handle == nullptr) return false;
+    return true;
+  }
+
+  void Erase(int key) {
+    cache_->Erase(EncodeInt(key));
+  }
+
+ protected:
+  // This initializes the cache with the specified 'eviction_policy' and
+  // 'sharding_policy'. Child classes frequently call this from the SetUp() method.
+  // For example, there are shared tests that apply to all eviction algorithms. This
+  // function is used during SetUp() to allow those shared tests to be parameterized to
+  // use various combinations of eviction policy and sharding policy.
+  void SetupWithParameters(Cache::EvictionPolicy eviction_policy,
+      ShardingPolicy sharding_policy);
+
+  const size_t cache_size_;
+  // Record of all evicted keys/values. When an entry is evicted, its key is put in
+  // evicted_keys_ and its value is put in evicted_values_. These are inserted in
+  // the order they are evicted (i.e. index 0 happened before index 1).
+  std::vector<int> evicted_keys_;
+  std::vector<int> evicted_values_;
+  std::shared_ptr<kudu::MemTracker> mem_tracker_;
+  std::unique_ptr<Cache> cache_;
+};
+
+} // namespace impala
diff --git a/be/src/util/cache/cache.cc b/be/src/util/cache/cache.cc
index c9f97a7..ef27168 100644
--- a/be/src/util/cache/cache.cc
+++ b/be/src/util/cache/cache.cc
@@ -17,6 +17,7 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "common/status.h"
 #include "kudu/gutil/bits.h"
 #include "kudu/gutil/hash/city.h"
 #include "kudu/gutil/macros.h"
@@ -36,8 +37,6 @@
 // Useful in tests that require accurate cache capacity accounting.
 DECLARE_bool(cache_force_single_shard);
 
-DECLARE_double(cache_memtracker_approximation_ratio);
-
 using std::atomic;
 using std::shared_ptr;
 using std::string;
@@ -59,356 +58,6 @@ const Cache::IterationFunc Cache::kIterateOverAllEntriesFunc = [](
   return true;
 };
 
-namespace {
-
-// Recency list cache implementations (FIFO, LRU, etc.)
-
-// Recency list handle. An entry is a variable length heap-allocated structure.
-// Entries are kept in a circular doubly linked list ordered by some recency
-// criterion (e.g., access time for LRU policy, insertion time for FIFO policy).
-class RLHandle : public HandleBase {
-public:
-  RLHandle(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len, int charge)
-    : HandleBase(kv_ptr, key, hash, val_len, charge) {}
-
-  RLHandle()
-    : HandleBase(nullptr, Slice(), 0, 0, 0) {}
-
-  Cache::EvictionCallback* eviction_callback;
-  RLHandle* next;
-  RLHandle* prev;
-  std::atomic<int32_t> refs;
-};
-
-// A single shard of a cache that uses a recency list based eviction policy
-template<Cache::EvictionPolicy policy>
-class RLCacheShard : public CacheShard {
-  static_assert(
-      policy == Cache::EvictionPolicy::LRU || policy == Cache::EvictionPolicy::FIFO,
-      "RLCacheShard only supports LRU or FIFO");
-
- public:
-  explicit RLCacheShard(kudu::MemTracker* tracker);
-  ~RLCacheShard();
-
-  // Separate from constructor so caller can easily make an array of CacheShard
-  void SetCapacity(size_t capacity) override {
-    capacity_ = capacity;
-    max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio;
-  }
-
-  HandleBase* Allocate(Slice key, uint32_t hash, int val_len, int charge) override;
-  void Free(HandleBase* handle) override;
-  HandleBase* Insert(HandleBase* handle,
-      Cache::EvictionCallback* eviction_callback) override;
-  HandleBase* Lookup(const Slice& key, uint32_t hash, bool caching) override;
-  void Release(HandleBase* handle) override;
-  void Erase(const Slice& key, uint32_t hash) override;
-  size_t Invalidate(const Cache::InvalidationControl& ctl) override;
-
- private:
-  void RL_Remove(RLHandle* e);
-  void RL_Append(RLHandle* e);
-  // Update the recency list after a lookup operation.
-  void RL_UpdateAfterLookup(RLHandle* e);
-  // Just reduce the reference count by 1.
-  // Return true if last reference
-  bool Unref(RLHandle* e);
-  // Call the user's eviction callback, if it exists, and free the entry.
-  void FreeEntry(RLHandle* e);
-
-
-  // Update the memtracker's consumption by the given amount.
-  //
-  // This "buffers" the updates locally in 'deferred_consumption_' until the amount
-  // of accumulated delta is more than ~1% of the cache capacity. This improves
-  // performance under workloads with high eviction rates for a few reasons:
-  //
-  // 1) once the cache reaches its full capacity, we expect it to remain there
-  // in steady state. Each insertion is usually matched by an eviction, and unless
-  // the total size of the evicted item(s) is much different than the size of the
-  // inserted item, each eviction event is unlikely to change the total cache usage
-  // much. So, we expect that the accumulated error will mostly remain around 0
-  // and we can avoid propagating changes to the MemTracker at all.
-  //
-  // 2) because the cache implementation is sharded, we do this tracking in a bunch
-  // of different locations, avoiding bouncing cache-lines between cores. By contrast
-  // the MemTracker is a simple integer, so it doesn't scale as well under concurrency.
-  //
-  // Positive delta indicates an increased memory consumption.
-  void UpdateMemTracker(int64_t delta);
-
-  // Initialized before use.
-  size_t capacity_;
-
-  // mutex_ protects the following state.
-  kudu::simple_spinlock mutex_;
-  size_t usage_;
-
-  // Dummy head of recency list.
-  // rl.prev is newest entry, rl.next is oldest entry.
-  RLHandle rl_;
-
-  HandleTable table_;
-
-  kudu::MemTracker* mem_tracker_;
-  atomic<int64_t> deferred_consumption_ { 0 };
-
-  // Initialized based on capacity_ to ensure an upper bound on the error on the
-  // MemTracker consumption.
-  int64_t max_deferred_consumption_;
-};
-
-template<Cache::EvictionPolicy policy>
-RLCacheShard<policy>::RLCacheShard(kudu::MemTracker* tracker)
-  : usage_(0),
-    mem_tracker_(tracker) {
-  // Make empty circular linked list.
-  rl_.next = &rl_;
-  rl_.prev = &rl_;
-}
-
-template<Cache::EvictionPolicy policy>
-RLCacheShard<policy>::~RLCacheShard() {
-  for (RLHandle* e = rl_.next; e != &rl_; ) {
-    RLHandle* next = e->next;
-    DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1)
-        << "caller has an unreleased handle";
-    if (Unref(e)) {
-      FreeEntry(e);
-    }
-    e = next;
-  }
-  mem_tracker_->Consume(deferred_consumption_);
-}
-
-template<Cache::EvictionPolicy policy>
-bool RLCacheShard<policy>::Unref(RLHandle* e) {
-  DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0);
-  return e->refs.fetch_sub(1) == 1;
-}
-
-template<Cache::EvictionPolicy policy>
-void RLCacheShard<policy>::FreeEntry(RLHandle* e) {
-  DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0);
-  if (e->eviction_callback) {
-    e->eviction_callback->EvictedEntry(e->key(), e->value());
-  }
-  UpdateMemTracker(-static_cast<int64_t>(e->charge()));
-  Free(e);
-}
-
-template<Cache::EvictionPolicy policy>
-void RLCacheShard<policy>::UpdateMemTracker(int64_t delta) {
-  int64_t old_deferred = deferred_consumption_.fetch_add(delta);
-  int64_t new_deferred = old_deferred + delta;
-
-  if (new_deferred > max_deferred_consumption_ ||
-      new_deferred < -max_deferred_consumption_) {
-    int64_t to_propagate = deferred_consumption_.exchange(0, std::memory_order_relaxed);
-    mem_tracker_->Consume(to_propagate);
-  }
-}
-
-template<Cache::EvictionPolicy policy>
-void RLCacheShard<policy>::RL_Remove(RLHandle* e) {
-  e->next->prev = e->prev;
-  e->prev->next = e->next;
-  DCHECK_GE(usage_, e->charge());
-  usage_ -= e->charge();
-}
-
-template<Cache::EvictionPolicy policy>
-void RLCacheShard<policy>::RL_Append(RLHandle* e) {
-  // Make "e" newest entry by inserting just before rl_.
-  e->next = &rl_;
-  e->prev = rl_.prev;
-  e->prev->next = e;
-  e->next->prev = e;
-  usage_ += e->charge();
-}
-
-template<>
-void RLCacheShard<Cache::EvictionPolicy::FIFO>::RL_UpdateAfterLookup(RLHandle* /* e */) {
-}
-
-template<>
-void RLCacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e) {
-  RL_Remove(e);
-  RL_Append(e);
-}
-
-template<Cache::EvictionPolicy policy>
-HandleBase* RLCacheShard<policy>::Allocate(Slice key, uint32_t hash, int val_len,
-    int charge) {
-  int key_len = key.size();
-  DCHECK_GE(key_len, 0);
-  DCHECK_GE(val_len, 0);
-  int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
-  uint8_t* buf = new uint8_t[sizeof(RLHandle)
-                             + key_len_padded + val_len]; // the kv_data VLA data
-  // TODO(KUDU-1091): account for the footprint of structures used by Cache's
-  //                  internal housekeeping (RL handles, etc.) in case of
-  //                  non-automatic charge.
-  int calc_charge =
-    (charge == Cache::kAutomaticCharge) ? kudu::kudu_malloc_usable_size(buf) : charge;
-  uint8_t* kv_ptr = buf + sizeof(RLHandle);
-  RLHandle* handle = new (buf) RLHandle(kv_ptr, key, hash, val_len,
-      calc_charge);
-  return handle;
-}
-
-template<Cache::EvictionPolicy policy>
-void RLCacheShard<policy>::Free(HandleBase* handle) {
-  // We allocate the handle as a uint8_t array, then we call a placement new,
-  // which calls the constructor. For symmetry, we call the destructor and then
-  // delete on the uint8_t array.
-  RLHandle* h = static_cast<RLHandle*>(handle);
-  h->~RLHandle();
-  uint8_t* data = reinterpret_cast<uint8_t*>(handle);
-  delete [] data;
-}
-
-template<Cache::EvictionPolicy policy>
-HandleBase* RLCacheShard<policy>::Lookup(const Slice& key,
-                                         uint32_t hash,
-                                         bool caching) {
-  RLHandle* e;
-  {
-    std::lock_guard<decltype(mutex_)> l(mutex_);
-    e = static_cast<RLHandle*>(table_.Lookup(key, hash));
-    if (e != nullptr) {
-      e->refs.fetch_add(1, std::memory_order_relaxed);
-      RL_UpdateAfterLookup(e);
-    }
-  }
-
-  return e;
-}
-
-template<Cache::EvictionPolicy policy>
-void RLCacheShard<policy>::Release(HandleBase* handle) {
-  RLHandle* e = static_cast<RLHandle*>(handle);
-  bool last_reference = Unref(e);
-  if (last_reference) {
-    FreeEntry(e);
-  }
-}
-
-template<Cache::EvictionPolicy policy>
-HandleBase* RLCacheShard<policy>::Insert(
-    HandleBase* handle_in,
-    Cache::EvictionCallback* eviction_callback) {
-  RLHandle* handle = static_cast<RLHandle*>(handle_in);
-  // Set the remaining RLHandle members which were not already allocated during
-  // Allocate().
-  handle->eviction_callback = eviction_callback;
-  // Two refs for the handle: one from RLCacheShard, one for the returned handle.
-  handle->refs.store(2, std::memory_order_relaxed);
-  UpdateMemTracker(handle->charge());
-
-  RLHandle* to_remove_head = nullptr;
-  {
-    std::lock_guard<decltype(mutex_)> l(mutex_);
-
-    RL_Append(handle);
-
-    RLHandle* old = static_cast<RLHandle*>(table_.Insert(handle));
-    if (old != nullptr) {
-      RL_Remove(old);
-      if (Unref(old)) {
-        old->next = to_remove_head;
-        to_remove_head = old;
-      }
-    }
-
-    while (usage_ > capacity_ && rl_.next != &rl_) {
-      RLHandle* old = rl_.next;
-      RL_Remove(old);
-      table_.Remove(old->key(), old->hash());
-      if (Unref(old)) {
-        old->next = to_remove_head;
-        to_remove_head = old;
-      }
-    }
-  }
-
-  // we free the entries here outside of mutex for
-  // performance reasons
-  while (to_remove_head != nullptr) {
-    RLHandle* next = to_remove_head->next;
-    FreeEntry(to_remove_head);
-    to_remove_head = next;
-  }
-
-  return handle;
-}
-
-template<Cache::EvictionPolicy policy>
-void RLCacheShard<policy>::Erase(const Slice& key, uint32_t hash) {
-  RLHandle* e;
-  bool last_reference = false;
-  {
-    std::lock_guard<decltype(mutex_)> l(mutex_);
-    e = static_cast<RLHandle*>(table_.Remove(key, hash));
-    if (e != nullptr) {
-      RL_Remove(e);
-      last_reference = Unref(e);
-    }
-  }
-  // mutex not held here
-  // last_reference will only be true if e != NULL
-  if (last_reference) {
-    FreeEntry(e);
-  }
-}
-
-template<Cache::EvictionPolicy policy>
-size_t RLCacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
-  size_t invalid_entry_count = 0;
-  size_t valid_entry_count = 0;
-  RLHandle* to_remove_head = nullptr;
-
-  {
-    std::lock_guard<decltype(mutex_)> l(mutex_);
-
-    // rl_.next is the oldest (a.k.a. least relevant) entry in the recency list.
-    RLHandle* h = rl_.next;
-    while (h != nullptr && h != &rl_ &&
-           ctl.iteration_func(valid_entry_count, invalid_entry_count)) {
-      if (ctl.validity_func(h->key(), h->value())) {
-        // Continue iterating over the list.
-        h = h->next;
-        ++valid_entry_count;
-        continue;
-      }
-      // Copy the handle slated for removal.
-      RLHandle* h_to_remove = h;
-      // Prepare for next iteration of the cycle.
-      h = h->next;
-
-      RL_Remove(h_to_remove);
-      table_.Remove(h_to_remove->key(), h_to_remove->hash());
-      if (Unref(h_to_remove)) {
-        h_to_remove->next = to_remove_head;
-        to_remove_head = h_to_remove;
-      }
-      ++invalid_entry_count;
-    }
-  }
-  // Once removed from the lookup table and the recency list, the entries
-  // with no references left must be deallocated because Cache::Release()
-  // wont be called for them from elsewhere.
-  while (to_remove_head != nullptr) {
-    RLHandle* next = to_remove_head->next;
-    FreeEntry(to_remove_head);
-    to_remove_head = next;
-  }
-  return invalid_entry_count;
-}
-
-}  // end anonymous namespace
-
 // Determine the number of bits of the hash that should be used to determine
 // the cache shard. This, in turn, determines the number of shards.
 int DetermineShardBits() {
@@ -424,39 +73,30 @@ string ToString(Cache::EvictionPolicy p) {
       return "fifo";
     case Cache::EvictionPolicy::LRU:
       return "lru";
+    case Cache::EvictionPolicy::LIRS:
+      return "lirs";
     default:
       LOG(FATAL) << "unexpected cache eviction policy: " << static_cast<int>(p);
   }
   return "unknown";
 }
 
-template<>
-Cache* NewCache<Cache::EvictionPolicy::FIFO,
-                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id) {
-  return new ShardedCache<Cache::EvictionPolicy::FIFO>(capacity, id);
-}
-
-template<>
-Cache* NewCache<Cache::EvictionPolicy::LRU,
-                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id) {
-  return new ShardedCache<Cache::EvictionPolicy::LRU>(capacity, id);
-}
-
-template<Cache::EvictionPolicy policy>
-CacheShard* NewCacheShard(kudu::MemTracker* mem_tracker) {
-  return new RLCacheShard<policy>(mem_tracker);
-}
-
-std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type) {
-  switch (mem_type) {
-    case Cache::MemoryType::DRAM:
-      os << "DRAM";
-      break;
+CacheShard* NewCacheShard(Cache::EvictionPolicy policy, kudu::MemTracker* mem_tracker,
+    size_t capacity) {
+  switch (policy) {
+    case Cache::EvictionPolicy::FIFO:
+      return NewCacheShardInt<Cache::EvictionPolicy::FIFO>(mem_tracker, capacity);
+    case Cache::EvictionPolicy::LRU:
+      return NewCacheShardInt<Cache::EvictionPolicy::LRU>(mem_tracker, capacity);
+    case Cache::EvictionPolicy::LIRS:
+      return NewCacheShardInt<Cache::EvictionPolicy::LIRS>(mem_tracker, capacity);
     default:
-      os << "unknown (" << static_cast<int>(mem_type) << ")";
-      break;
+      LOG(FATAL) << "unexpected cache eviction policy: " << static_cast<int>(policy);
   }
-  return os;
+}
+
+Cache* NewCache(Cache::EvictionPolicy policy, size_t capacity, const std::string& id) {
+  return new ShardedCache(policy, capacity, id);
 }
 
 }  // namespace impala
diff --git a/be/src/util/cache/cache.h b/be/src/util/cache/cache.h
index a64fe19..aac1729 100644
--- a/be/src/util/cache/cache.h
+++ b/be/src/util/cache/cache.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <boost/algorithm/string.hpp>
 #include <cstddef>
 #include <cstdint>
 #include <functional>
@@ -27,6 +28,7 @@
 
 #include "kudu/gutil/macros.h"
 #include "kudu/util/slice.h"
+#include "common/status.h"
 
 using kudu::Slice;
 
@@ -34,11 +36,6 @@ namespace impala {
 
 class Cache {
  public:
-  // Type of memory backing the cache's storage.
-  enum class MemoryType {
-    DRAM
-  };
-
   // Supported eviction policies for the cache. Eviction policy determines what
   // items to evict if the cache is at capacity when trying to accommodate
   // an extra item.
@@ -48,8 +45,24 @@ class Cache {
 
     // The least-recently-used items are evicted.
     LRU,
+
+    // LIRS (Low Inter-reference Recency Set)
+    LIRS,
   };
 
+  static EvictionPolicy ParseEvictionPolicy(const std::string& policy_string) {
+    string upper_policy = boost::to_upper_copy(policy_string);
+    if (upper_policy == "LRU") {
+      return Cache::EvictionPolicy::LRU;
+    } else if (upper_policy == "LIRS") {
+      return Cache::EvictionPolicy::LIRS;
+    } else if (upper_policy == "FIFO") {
+      return Cache::EvictionPolicy::FIFO;
+    }
+    LOG(FATAL) << "Unsupported eviction policy: " << policy_string;
+    return Cache::EvictionPolicy::LRU;
+  }
+
   // Callback interface which is called when an entry is evicted from the
   // cache.
   class EvictionCallback {
@@ -64,6 +77,11 @@ class Cache {
   // function that was passed to the constructor.
   virtual ~Cache();
 
+  // Initialization function for the cache. This must be called before using
+  // any other methods. It returns error status if there is any issue with
+  // the cache or its parameters.
+  virtual Status Init() = 0;
+
   // Opaque handle to an entry stored in the cache.
   struct Handle { };
 
@@ -120,20 +138,26 @@ class Cache {
   // to facilitate automatic reference counting newly allocated cache's handles.
   typedef std::unique_ptr<PendingHandle, PendingHandleDeleter> UniquePendingHandle;
 
-  // Passing EXPECT_IN_CACHE will increment the hit/miss metrics that track the number of times
-  // blocks were requested that the users were hoping to get the block from the cache, along with
-  // with the basic metrics.
-  // Passing NO_EXPECT_IN_CACHE will only increment the basic metrics.
-  // This helps in determining if we are effectively caching the blocks that matter the most.
-  enum CacheBehavior {
-    EXPECT_IN_CACHE,
-    NO_EXPECT_IN_CACHE
+  // There are times when callers may want to perform a Lookup() without having it
+  // impact the priorities of the cache elements. For example, if the caller is
+  // doing a Lookup() as an internal part of its implementation that does not
+  // correspond to actual user activity (or duplicates user activity).
+  //
+  // When LookupBehavior is NORMAL (the default), the cache policy will update the
+  // priorities of entries. When LookupBehavior is NO_UPDATE, the cache policy will
+  // not update the priority of any entry.
+  enum LookupBehavior {
+    NORMAL,
+    NO_UPDATE
   };
 
   // If the cache has no mapping for "key", returns NULL.
   //
   // Else return a handle that corresponds to the mapping.
   //
+  // Handles are not intended to be held for significant periods of time. Also,
+  // threads should avoid getting multiple handles for the same key simultaneously.
+  //
   // Sample usage:
   //
   //   unique_ptr<Cache> cache(NewCache(...));
@@ -152,7 +176,7 @@ class Cache {
   //     ...
   //   } // 'h' is automatically released here
   //
-  virtual UniqueHandle Lookup(const Slice& key, CacheBehavior caching) = 0;
+  virtual UniqueHandle Lookup(const Slice& key, LookupBehavior behavior = NORMAL) = 0;
 
   // If the cache contains entry for key, erase it.  Note that the
   // underlying entry will be kept around until all existing handles
@@ -221,14 +245,19 @@ class Cache {
 
   // Commit a prepared entry into the cache.
   //
-  // Returns a handle that corresponds to the mapping. This method always
-  // succeeds and returns a non-null entry, since the space was reserved above.
-  //
   // The 'pending' entry passed here should have been allocated using
   // Cache::Allocate() above.
   //
+  // This method is not guaranteed to succeed. If it succeeds, it returns a handle
+  // that corresponds to the mapping. If it fails, it returns a null handle.
+  //
+  // Handles are not intended to be held for significant periods of time. Also,
+  // threads should avoid getting multiple handles for the same key simultaneously.
+  //
   // If 'eviction_callback' is non-NULL, then it will be called when the
-  // entry is later evicted or when the cache shuts down.
+  // entry is later evicted or when the cache shuts down. If the Insert()
+  // fails, the entry is immediately destroyed and the eviction callback is called
+  // before the return of Insert().
   virtual UniqueHandle Insert(UniquePendingHandle pending,
                               EvictionCallback* eviction_callback) = 0;
 
@@ -333,27 +362,8 @@ class Cache {
   DISALLOW_COPY_AND_ASSIGN(Cache);
 };
 
-// A template helper function to instantiate a cache of particular
-// 'eviction_policy' flavor, backed by the given storage 'mem_type',
-// where 'capacity' specifies the capacity of the result cache,
-// and 'id' specifies its identifier.
-template<Cache::EvictionPolicy eviction_policy = Cache::EvictionPolicy::LRU,
-         Cache::MemoryType mem_type = Cache::MemoryType::DRAM>
-Cache* NewCache(size_t capacity, const std::string& id);
-
-// Create a new FIFO cache with a fixed size capacity. This implementation
-// of Cache uses the first-in-first-out eviction policy and stored in DRAM.
-template<>
-Cache* NewCache<Cache::EvictionPolicy::FIFO,
-                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id);
-
-// Create a new LRU cache with a fixed size capacity. This implementation
-// of Cache uses the least-recently-used eviction policy and stored in DRAM.
-template<>
-Cache* NewCache<Cache::EvictionPolicy::LRU,
-                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id);
-
-// A helper method to output cache memory type into ostream.
-std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type);
+// Instantiate a cache of a particular 'policy' flavor with the specified 'capacity'
+// and identifier 'id'.
+Cache* NewCache(Cache::EvictionPolicy policy, size_t capacity, const std::string& id);
 
 } // namespace impala
diff --git a/be/src/util/cache/lirs-cache-test.cc b/be/src/util/cache/lirs-cache-test.cc
new file mode 100644
index 0000000..2f1a96a
--- /dev/null
+++ b/be/src/util/cache/lirs-cache-test.cc
@@ -0,0 +1,557 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/cache/cache.h"
+#include "util/cache/cache-test.h"
+
+#include <cstring>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/slice.h"
+
+DECLARE_double(lirs_unprotected_percentage);
+DECLARE_double(lirs_tombstone_multiple);
+
+namespace impala {
+
+class LIRSCacheTest : public CacheBaseTest {
+ public:
+  LIRSCacheTest()
+    : CacheBaseTest(100) {
+    FLAGS_lirs_unprotected_percentage = 5.0;
+    FLAGS_lirs_tombstone_multiple = 2.0;
+  }
+
+  void SetUp() override {
+    SetupWithParameters(Cache::EvictionPolicy::LIRS, ShardingPolicy::SingleShard);
+  }
+
+ protected:
+  // This fills the cache with 100 elements (0-99) and verifies that there were no
+  // evictions. Entries are inserted into the protected space until it is full
+  // (95 spots, so 0-94), then they are inserted as unprotected (5 spots, so 95-99).
+  void FillCache() {
+    for (int i = 0; i < 100; ++i) {
+      ASSERT_TRUE(Insert(i, i));
+    }
+    ASSERT_EQ(evicted_keys_.size(), 0);
+    ASSERT_EQ(evicted_values_.size(), 0);
+  }
+
+  // This fills the cache and keeps adding elements until it reaches the limit on the
+  // number of tombstones. This will add 300 elements. 100 remain in the cache at the
+  // end along with 200 tombstones. This also clears the evicted_key_ and evicted_values_
+  // to provide a clean slate for tests.
+  void FillCacheToTombstoneLimit() {
+    for (int i = 0; i < 300; ++i) {
+      ASSERT_TRUE(Insert(i, i));
+    }
+    // Items 0-94 remain protected.
+    // Items 95-294 were evicted and are tombstones.
+    // Items 295-299 are unprotected.
+    for (int i = 0; i < 200; ++i) {
+      ASSERT_EQ(evicted_keys_[i], i+95);
+      ASSERT_EQ(evicted_values_[i], i+95);
+    }
+    evicted_keys_.clear();
+    evicted_values_.clear();
+  }
+
+  // This forces all entries to be evicted. It repeatedly references entirely new
+  // elements twice in a row, forcing them to become protected. The preexisting
+  // elements should be evicted starting with the unprotected in FIFO order followed by
+  // the protected in FIFO order.
+  void FlushCache() {
+    for (int i = 0; i < 100; ++i) {
+      // Insert as unprotected
+      ASSERT_TRUE(Insert(1000 + i, 1000 + i));
+      // Lookup moves to protected
+      ASSERT_EQ(Lookup(1000 + i), 1000 + i);
+    }
+  }
+};
+
+TEST_F(LIRSCacheTest, BasicEvictionOrdering) {
+  // This tests a very simple case: insert entries, then flush the cache and verify
+  // entries are evicted in the appropriate order.
+
+  FillCache();
+  FlushCache();
+
+  // There were 5 unprotected elements (95-99). They should be the first to be evicted.
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 95+i);
+    ASSERT_EQ(evicted_values_[i], 95+i);
+  }
+  // There were 95 protected elements (0-94). They should be evicted in order.
+  for (int i = 5; i < 100; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i-5);
+    ASSERT_EQ(evicted_values_[i], i-5);
+  }
+}
+
+TEST_F(LIRSCacheTest, LookupNoUpdate) {
+  // Lookup operations can be tagged as NO_UPDATE, in which case, nothing should change
+  // priority. Verify that adding NO_UPDATE Lookups to the basic case does not change
+  // the eviction order because nothing moves.
+
+  FillCache();
+
+  // Do a NO_UPDATE lookup on an unprotected element (which otherwise would become
+  // protected)
+  Lookup(97, Cache::NO_UPDATE);
+
+  // Do a NO_UPDATE lookup on a protected element (which otherwise would move in the
+  // recency queue).
+  Lookup(55, Cache::NO_UPDATE);
+
+  FlushCache();
+
+  // There were 5 unprotected elements (95-99). They should be the first to be evicted.
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 95+i);
+    ASSERT_EQ(evicted_values_[i], 95+i);
+  }
+  // There were 95 protected elements (0-94). They should be evicted in order.
+  for (int i = 5; i < 100; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i-5);
+    ASSERT_EQ(evicted_values_[i], i-5);
+  }
+}
+
+TEST_F(LIRSCacheTest, RejectLarge) {
+  // Allocate() rejects anything larger than the protected capacity (which is 95)
+  // Insert() returns false if Allocate() fails.
+  ASSERT_FALSE(Insert(100, 100, 96));
+  ASSERT_EQ(evicted_keys_.size(), 0);
+  ASSERT_EQ(evicted_values_.size(), 0);
+
+  // Allocate() does not reject something of exactly the protected capacity (95)
+  ASSERT_TRUE(Insert(100, 100, 95));
+  ASSERT_EQ(evicted_keys_.size(), 0);
+  ASSERT_EQ(evicted_values_.size(), 0);
+}
+
+TEST_F(LIRSCacheTest, LargeInsertUnprotectedEvict) {
+  // This tests that inserting a single large element can evict all of the UNPROTECTED
+  // elements, along with itself.
+  FillCache();
+  ASSERT_FALSE(Insert(100, 100, 6));
+  // All 5 UNPROTECTED got evicted, along with the element being inserted.
+  ASSERT_EQ(evicted_keys_.size(), 6);
+  ASSERT_EQ(evicted_values_.size(), 6);
+  for (int i = 0; i < 6; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 95+i);
+    ASSERT_EQ(evicted_values_[i], 95+i);
+  }
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  FlushCache();
+
+  // Only the protected remain, and they are all in order
+  ASSERT_EQ(evicted_keys_.size(), 95);
+  ASSERT_EQ(evicted_values_.size(), 95);
+  for (int i = 0; i < 95; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i);
+    ASSERT_EQ(evicted_values_[i], i);
+  }
+}
+
+TEST_F(LIRSCacheTest, LargeProtectedToUnprotectedEvict) {
+  // This tests that a PROTECTED element that is larger than the unprotected capacity
+  // will evict everything including itself when it transitions to UNPROTECTED.
+
+  // Insert PROTECTED element that is larger than the unprotected capacity
+  ASSERT_TRUE(Insert(0, 0, 95));
+  ASSERT_EQ(evicted_keys_.size(), 0);
+  ASSERT_EQ(evicted_values_.size(), 0);
+
+  // Insert 5 elements, which will be UNPROTECTED
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_TRUE(Insert(100+i, 100+i));
+  }
+  ASSERT_EQ(evicted_keys_.size(), 0);
+  ASSERT_EQ(evicted_values_.size(), 0);
+
+  // Looking up an UNPROTECTED element will transition it to PROTECTED, evicting
+  // the very large PROTECTED element. That will evict all four of the UNPROTECTED
+  // elements along with itself.
+  ASSERT_EQ(Lookup(104), 104);
+  ASSERT_EQ(evicted_keys_.size(), 5);
+  ASSERT_EQ(evicted_values_.size(), 5);
+
+  // UNPROTECTED elements were evicted first
+  for (int i = 0; i < 4; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 100+i);
+    ASSERT_EQ(evicted_values_[i], 100+i);
+  }
+  // Large formerly PROTECTED element is evicted last
+  ASSERT_EQ(evicted_keys_[4], 0);
+  ASSERT_EQ(evicted_values_[4], 0);
+}
+
+TEST_F(LIRSCacheTest, LargeTombstone) {
+  // Large unprotected entries can transition directly to being a TOMBSTONE on Insert().
+  // This test verifies that this TOMBSTONE entry (which is larger than the unprotected
+  // capacity) can be promoted to be PROTECTED if there is another Insert().
+
+  // One protected element
+  ASSERT_TRUE(Insert(0, 0, 95));
+  ASSERT_EQ(Lookup(0), 0);
+  ASSERT_EQ(evicted_keys_.size(), 0);
+  ASSERT_EQ(evicted_values_.size(), 0);
+
+  // Large unprotected element is immediately tombstone
+  ASSERT_FALSE(Insert(1, 1, 95));
+  ASSERT_EQ(evicted_keys_.size(), 1);
+  ASSERT_EQ(evicted_keys_[0], 1);
+  ASSERT_EQ(evicted_values_[0], 1);
+  ASSERT_EQ(Lookup(1), -1);
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  // Insert of tombstone turns it protected, evicting the current protected key
+  ASSERT_TRUE(Insert(1, 1, 95));
+  ASSERT_EQ(evicted_keys_.size(), 1);
+  ASSERT_EQ(evicted_keys_[0], 0);
+  ASSERT_EQ(evicted_values_[0], 0);
+  ASSERT_EQ(Lookup(1), 1);
+  ASSERT_EQ(Lookup(0), -1);
+}
+
+TEST_F(LIRSCacheTest, InsertExistingUnprotected) {
+  // This tests the behavior of insert when there is already an unprotected element with
+  // the same key. It should replace the existing value, but the new element should
+  // continue to be unprotected.
+
+  FillCache();
+
+  // Replace an unprotected key with a new value
+  Insert(95, 1095);
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(evicted_keys_[0], 95);
+  ASSERT_EQ(evicted_values_[0], 95);
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  FlushCache();
+
+  // The only thing we guarantee is that key 95 is still unprotected. None of the other
+  // unprotected elements should have moved around, but the ordering is not particularly
+  // important, so this only verifies that the values are still around.
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_LT(evicted_keys_[i], 100);
+    ASSERT_GE(evicted_keys_[i], 95);
+    if (evicted_keys_[i] == 95) {
+      ASSERT_EQ(evicted_values_[i], 1095);
+    } else {
+      ASSERT_EQ(evicted_values_[i], evicted_keys_[i]);
+    }
+  }
+  // There were 95 protected elements (0-94). They are not impacted, and they are still
+  // evicted in order.
+  for (int i = 5; i < 100; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i-5);
+    ASSERT_EQ(evicted_values_[i], i-5);
+  }
+}
+
+TEST_F(LIRSCacheTest, InsertExistingProtected) {
+  // This is the same as InsertExistingUnprotected, except that it is verifying that
+  // replacing an existing protected key will remain protected.
+
+  FillCache();
+
+  // Replace an unprotected key with a new value
+  Insert(25, 1025);
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(evicted_keys_[0], 25);
+  ASSERT_EQ(evicted_values_[0], 25);
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  FlushCache();
+
+  // There were 5 unprotected elements (95-99). They are not impacted by changes in
+  // the protected elements.
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 95+i);
+    ASSERT_EQ(evicted_values_[i], 95+i);
+  }
+  // The only thing we guarantee is that key 25 is still protected. None of the other
+  // protected elements moved around, but the exact ordering is not specified.
+  for (int i = 5; i < 100; ++i) {
+    ASSERT_LT(evicted_keys_[i], 95);
+    ASSERT_GE(evicted_keys_[i], 0);
+    if (evicted_keys_[i] == 25) {
+      ASSERT_EQ(evicted_values_[i], 1025);
+    } else {
+      ASSERT_EQ(evicted_values_[i], evicted_keys_[i]);
+    }
+  }
+}
+
+TEST_F(LIRSCacheTest, UnprotectedToProtected) {
+  // This tests the behavior of lookup of an unprotected key that is more recent than
+  // the oldest protected key (i.e. it should be promoted to be protected).
+
+  FillCache();
+
+  // If we lookup 95, it will move from unprotected to protected.
+  ASSERT_EQ(Lookup(95), 95);
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  FlushCache();
+
+  // The 5 unprotected elements are 96, 97, 98, 99, 0 (pushed out of protected)
+
+  // There were 5 unprotected elements (95-99). They are not impacted by changes in
+  // the protected elements.
+  for (int i = 0; i < 4; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 96+i);
+    ASSERT_EQ(evicted_values_[i], 96+i);
+  }
+  ASSERT_EQ(evicted_keys_[4], 0);
+  ASSERT_EQ(evicted_values_[4], 0);
+
+  // 0 was pushed out of protected, and 95 was added (at the tail of the recency queue).
+  // So, this is just one offset from the case in BasicEvictionOrdering.
+  for (int i = 5; i < 100; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i-4);
+    ASSERT_EQ(evicted_values_[i], i-4);
+  }
+}
+
+TEST_F(LIRSCacheTest, TombstoneToProtected) {
+  // This tests the behavior of insert for a key that has a tombstone element in the
+  // cache. It should insert the new element as a protected element.
+
+  FillCache();
+
+  // Add one more element, which will evict element 95. It is now a tombstone.
+  Insert(100, 100);
+  ASSERT_EQ(evicted_values_[0], 95);
+  ASSERT_EQ(evicted_keys_[0], 95);
+  ASSERT_EQ(Lookup(95), -1);
+
+  // If we insert 95 again, it will become a protected element due to the tombstone.
+  Insert(95, 95);
+  ASSERT_EQ(evicted_keys_[1], 96);
+  ASSERT_EQ(evicted_values_[1], 96);
+  evicted_values_.clear();
+  evicted_keys_.clear();
+
+  FlushCache();
+
+  // The 5 unprotected elements are 97,98,99,100, 0 (pushed out of protected)
+
+  // There were 5 unprotected elements (95-99). They are not impacted by changes in
+  // the protected elements.
+  for (int i = 0; i < 4; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 97+i);
+    ASSERT_EQ(evicted_values_[i], 97+i);
+  }
+  ASSERT_EQ(evicted_keys_[4], 0);
+  ASSERT_EQ(evicted_values_[4], 0);
+
+  // 0 was pushed out of protected, and 95 was added (at the tail of the recency queue).
+  // So, this is just one offset from the case in BasicEvictionOrdering.
+  for (int i = 5; i < 100; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i-4);
+    ASSERT_EQ(evicted_values_[i], i-4);
+  }
+}
+
+TEST_F(LIRSCacheTest, UnprotectedToUnprotected) {
+
+  FillCache();
+
+  // If we lookup every element that is protected, then the unprotected elements
+  // will be trimmed from the recency queue. At that point, a lookup to the
+  // unprotected elements will not change them to be protected.
+  for (int i = 0; i < 95; ++i) {
+    ASSERT_EQ(Lookup(i), i);
+  }
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  // The unprotected elements will remain in the unprotected list
+  for (int i = 95; i < 100; ++i) {
+    ASSERT_EQ(Lookup(i), i);
+  }
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  FlushCache();
+
+  // All of this means that the results are the same as BasicEvictionOrdering
+  // There were 5 unprotected elements (95-99). They should be the first to be evicted.
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 95+i);
+    ASSERT_EQ(evicted_values_[i], 95+i);
+  }
+  // There were 95 protected elements (0-94). They should be evicted in order.
+  for (int i = 5; i < 100; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i-5);
+    ASSERT_EQ(evicted_values_[i], i-5);
+  }
+}
+
+TEST_F(LIRSCacheTest, Erase) {
+  // This tests that Erase works for both unprotected and protected elements.
+
+  FillCache();
+
+  // Erase a protected element
+  Erase(25);
+  ASSERT_EQ(evicted_keys_[0], 25);
+  ASSERT_EQ(evicted_values_[0], 25);
+
+  // Erase an unprotected element
+  Erase(95);
+  ASSERT_EQ(evicted_keys_[1], 95);
+  ASSERT_EQ(evicted_values_[1], 95);
+
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  FlushCache();
+
+  // There were 5 unprotected elements (95-99). Element 95 was erased. Verify the
+  // remaining 4 are appropriate.
+  for (int i = 0; i < 4; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 96+i);
+    ASSERT_EQ(evicted_values_[i], 96+i);
+  }
+  // There were 95 protected elements (0-94). Element 25 was erased. Verify the
+  // two chunks.
+  for (int i = 0; i < 25; ++i) {
+    ASSERT_EQ(evicted_keys_[i+4], i);
+    ASSERT_EQ(evicted_values_[i+4], i);
+  }
+  for (int i = 25; i < 94; ++i) {
+    ASSERT_EQ(evicted_keys_[i+4], i + 1);
+    ASSERT_EQ(evicted_values_[i+4], i + 1);
+  }
+}
+
+TEST_F(LIRSCacheTest, TombstoneLimit1) {
+  // This tests the enforcement of the tombstone limit. The lirs_tombstone_multiple is
+  // 2.0, so we expect a cache with 100 elements to maintain at most 200 tombstones.
+
+  // Fill the cache to the point where there are 100 normal elements and 200 tombstones.
+  FillCacheToTombstoneLimit();
+
+  // Now, add one final element
+  Insert(300, 300);
+  // Item 295 was evicted and became a tombstone
+  ASSERT_EQ(evicted_keys_[0], 295);
+  ASSERT_EQ(evicted_values_[0], 295);
+  ASSERT_EQ(evicted_keys_.size(), 1);
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  // Item 95 was a tombstone, but we expect it to be removed due to the tombstone limit.
+  // When we insert it, it will be unprotected.
+  Insert(95, 95);
+  ASSERT_EQ(evicted_keys_.size(), 1);
+  ASSERT_EQ(evicted_keys_[0], 296);
+  ASSERT_EQ(evicted_values_[0], 296);
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  FlushCache();
+
+  // The 5 unprotected elements are 297, 298, 299, 300, 95
+  vector<int> unprotected_elems = {297, 298, 299, 300, 95};
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_EQ(evicted_keys_[i], unprotected_elems[i]);
+    ASSERT_EQ(evicted_values_[i], unprotected_elems[i]);
+  }
+  // The protected elements are unaffected, so this is the same as BasicEvictionOrdering
+  for (int i = 5; i < 100; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i-5);
+    ASSERT_EQ(evicted_values_[i], i-5);
+  }
+}
+
+TEST_F(LIRSCacheTest, TombstoneLimit2) {
+  // This tests the enforcement of the tombstone limit. The lirs_tombstone_multiple is
+  // 2.0, so we expect a cache with 100 elements to maintain at most 200 tombstones.
+
+  // Fill the cache to the point where there are 100 normal elements and 200 tombstones.
+  FillCacheToTombstoneLimit();
+
+  // Now, add one final element
+  Insert(300, 300);
+  // Item 295 was evicted and became a tombstone
+  ASSERT_EQ(evicted_keys_[0], 295);
+  ASSERT_EQ(evicted_values_[0], 295);
+  ASSERT_EQ(evicted_keys_.size(), 1);
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  // Item 95 was a tombstone, and it should have been removed due to the tombstone limit.
+  // Item 96 was not removed, so when we insert it, it will be protected.
+  Insert(96, 96);
+  ASSERT_EQ(evicted_keys_.size(), 1);
+  ASSERT_EQ(evicted_keys_[0], 296);
+  ASSERT_EQ(evicted_values_[0], 296);
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  FlushCache();
+
+  // The 5 unprotected elements are 297, 298, 299, 300, 0 (pushed out of protected)
+  vector<int> unprotected_elems = {297, 298, 299, 300, 0};
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_EQ(evicted_keys_[i], unprotected_elems[i]);
+    ASSERT_EQ(evicted_values_[i], unprotected_elems[i]);
+  }
+  // 96 was added a protected element, so items 5-99 are 1-94.
+  for (int i = 5; i < 99; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i-4);
+    ASSERT_EQ(evicted_values_[i], i-4);
+  }
+  ASSERT_EQ(evicted_keys_[99], 96);
+  ASSERT_EQ(evicted_values_[99], 96);
+}
+
+TEST_F(LIRSCacheTest, TombstoneLimitFreeMultiple) {
+  // This tests the enforcement of the tombstone limit. This is a simple test that
+  // verifies we can free multiple tombstones at once.
+
+  // Fill the cache to the point where there are 100 normal elements and 200 tombstones.
+  FillCacheToTombstoneLimit();
+
+  // Inserting an unprotected element with a large size does two things. First, it
+  // evicts all the current unprotected and makes them tombstones. Second, the number
+  // of non-tombstones shrank, so this will free multiple tombstones.
+  // This test is primarily focused on not crashing.
+  ASSERT_TRUE(Insert(500, 500, 5));
+  ASSERT_EQ(evicted_keys_.size(), 5);
+  ASSERT_EQ(evicted_values_.size(), 5);
+}
+
+} // namespace impala
diff --git a/be/src/util/cache/lirs-cache.cc b/be/src/util/cache/lirs-cache.cc
new file mode 100644
index 0000000..de8f874
--- /dev/null
+++ b/be/src/util/cache/lirs-cache.cc
@@ -0,0 +1,987 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/cache/cache.h"
+#include "util/cache/cache-internal.h"
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include <boost/atomic/atomic.hpp>
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/list_hook.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "common/status.h"
+#include "gutil/mathlimits.h"
+#include "gutil/strings/substitute.h"
+#include "kudu/util/alignment.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/slice.h"
+
+DECLARE_double(cache_memtracker_approximation_ratio);
+DEFINE_double_hidden(lirs_unprotected_percentage, 5.00,
+    "Percentage of the LIRS cache used for unprotected entries. This must be between "
+    "0.0 and 50.0.");
+DEFINE_double_hidden(lirs_tombstone_multiple, 2.00,
+    "Multiple of tombstone entries to resident entries for LIRS cache. "
+    "If there are 100 normal entries, a multiple of 2.0 would allow 200 tombstones. "
+    "This is required to be at least 0.50.");
+
+using std::string;
+using std::vector;
+
+using boost::intrusive::member_hook;
+using boost::intrusive::list_member_hook;
+
+using kudu::Slice;
+
+using strings::Substitute;
+
+namespace impala {
+
+namespace {
+
+typedef kudu::simple_spinlock MutexType;
+
+// This implements the Low Inter-reference Recency Set (LIRS) algorithm described in
+// "LIRS: An Efficient Low Inter-reference Recency Set Replacement Policy to Improve
+//  Buffer Cache Performance" Song Jiang and Xiaodong Zhang 2002.
+//
+// LIRS is based on a combination of recency and reuse distance. Recency is the number
+// of other accesses since the last access to a particular key. Reuse distance is
+// the number of accesses between the last access and the second to last access to a
+// particular key. i.e. If we are now at access a2 and the last two access are at a0
+// and a1, the reuse distance is a1-a0 and the recency is a2-a1:
+// a0------ reuse distance ---------a1-------------- recency ---------------- a2 (now)
+// If the key has only been accessed once, its reuse distance is considered infinite.
+// Recency provides information about what the next reuse distance will be.
+// The intuition is that entries with a shorter reuse distance are more likely to be
+// revisited than ones with longer reuse distance.
+//
+// LIRS partitions the cache between two categories: LIR (low interreference recency)
+// and HIR (high interreference recency), and it has three different types of entries:
+// LIR, HIR resident, HIR nonresident. For clarity, these are renamed to be
+// PROTECTED (LIR), UNPROTECTED (HIR resident), and TOMBSTONE (HIR nonresident).
+// PROTECTED entries have been seen more than once recently and are "protected" from
+// eviction. A PROTECTED entry is only evicted after it transitions to being UNPROTECTED.
+// UNPROTECTED entries are new or infrequently accessed and can be evicted directly.
+// PROTECTED and UNPROTECTED entries occupy space in the cache and are accessible via
+// Lookup(). TOMBSTONE entries are metadata-only entries that have already been evicted.
+// They are not accessible via Lookup() and they don't occupy space in cache.
+// The purpose of UNPROTECTED and TOMBSTONE entries are to provide a way to identify when
+// there are new or previously infrequent entries that now have shorter interreference
+// recency than existing PROTECTED entries. If an entry in the UNPROTECTED or TOMBSTONE
+// categories is seen again and has a lower interreference recency than the oldest
+// PROTECTED entry, the UNPROTECTED/TOMBSTONE entry is promoted to PROTECTED and the
+// oldest PROTECTED entry is demoted to UNPROTECTED. Entries that are not in the cache,
+// either because they were never added or have been completely removed, have the
+// UNINITIALIZED state. Every handle starts in UNINITIALIZED and will transition to
+// UNINITIALIZED before being freed.
+//
+// The cache capacity is split between the UNPROTECTED and PROTECTED categories. The size
+// of the UNPROTECTED category is determined by the lirs_unprotected_percentage
+// parameter, which is typically in the 1-5% range. All remaining space is used for
+// PROTECTED entries.
+//
+// The implementation uses two linked lists:
+// 1. The recency list contains all types of entries in the order in which they were last
+//    seen. The recency list maintains the invariant that the oldest entry is a PROTECTED
+//    entry. If a modification to the recency list results in the oldest entry (or
+//    entries) being UNPROTECTED or TOMBSTONE entries, they are removed. This guarantees
+//    that if an UNPROTECTED or TOMBSTONE entry is seen again while on the recency list,
+//    it has a reuse distance shorter than the oldest entry on the recency list, so it
+//    can be upgraded to a PROTECTED entry (potentially evicting the oldest PROTECTED
+//    entry).
+// 2. The unprotected list contains only UNPROTECTED entries. Something can enter the
+//    unprotected list by being a previously unseen entry, or it can enter the
+//    unprotected list by being demoted from the PROTECTED state. The UNPROTECTED entries
+//    must fit within a fixed percentage of the cache, so when an entry is added to the
+//    unprotected list, other entries may be removed. Entries removed from the unprotected
+//    list are evicted from the cache and become TOMBSTONE entries.
+
+enum LIRSState : uint8_t {
+  UNINITIALIZED = 0,
+  PROTECTED = 1,
+  UNPROTECTED = 2,
+  TOMBSTONE = 3
+};
+
+// LIRS has PROTECTED/UNPROTECTED entries that are resident (i.e. occupying cache
+// capacity) as well as TOMBSTONE entries that are not resident (i.e. do not occupy cache
+// capacity). DataResidency tracks whether an entry is cache resident or not. Entries
+// always start and end as NOT_RESIDENT. Insert() transitions an entry from NOT_RESIDENT
+// to RESIDENT. When an entry is evicted, it transitions to EVICTING, runs the eviction
+// callback, then transitions to NOT_RESIDENT.
+//
+// This is distinct from the LIRSState, because eviction can only happen if there are no
+// external references. A TOMBSTONE entry (which should be evicted) can still be RESIDENT
+// if there is an external reference that has not been released. See the description of
+// AtomicState for how DataResidency is used to synchronize eviction between threads.
+enum DataResidency : uint8_t {
+  NOT_RESIDENT = 0,
+  RESIDENT = 1,
+  EVICTING = 2,
+};
+
+// The AtomicState contains all the state for an entry interacting with the LIRS
+// cache. A large number of the transitions are operating on a single field in the
+// struct (incrementing the refererence count, changing the state, etc), and they occur
+// while holding the cache-wide mutex. For example, the LIRSState transitions always
+// hold the mutex, and ref_count is only incremented while holding the mutex. This is
+// an atomic struct to allow some codepaths such as Release() and CleanupThreadState()
+// to avoid getting a mutex. Residency can be manipulated and ref_count can be
+// decremented without holding a mutex.
+//
+// All of the interesting state transitions involve reaching agreement about which thread
+// should evict and/or free an entry.
+//
+// Due to the existence of TOMBSTONE entries, eviction and free are now separate
+// actions. For eviction, every codepath that is modifying the atomic state to an
+// evictable state must determine if it should perform the eviction. The criteria
+// for eviction is that state = TOMBSTONE or state = UNINITIALIZED and ref_count = 0 and
+// residency = RESIDENT. If a thread is going to reach this state, it must instead set
+// residency = EVICTING and perform the eviction. Setting residency = EVICTING makes
+// it impossible for a different thread to reach an evictable state, so eviction will
+// happen exactly once.
+//
+// Determining which thread frees an entry is determined by which thread reaches
+// state = UNINITIALIZED, ref_count = 0, residency = NOT_RESIDENT. The scenario that this
+// is designed to solve is this:
+// Entry starts with state = TOMBSTONE, ref_count = 1, resident = RESIDENT
+// Thread 1: calls Release(), decrementing ref_count to 0 and setting resident = EVICTING
+// Thread 1: starts running CleanupThreadState()
+// Thread 2: starts running ToUninitialized()
+// There is a race between Thread 1 and Thread 2.
+// If Thread 1 sets resident = NOT_RESIDENT first, Thread 2 performs free.
+// If Thread 2 sets state = UNINITIALIZED first, Thread 1 performs free.
+// Eviction is not assumed to be fast or instantaneous, so it is useful that Thread 2
+// does not need to wait for Thread 1 to complete eviction.
+//
+// This is 32-bits to allow simple atomic operations and so that AtomicStateTransition is
+// 64-bits.
+struct AtomicState {
+  // 'ref_count' is a count of the number of external references (i.e. handles that
+  // have been returned from Lookup() or Insert() but have not yet called Release()).
+  // This is not specific to LIRS, every cache algorithm needs an equivalent count.
+  // This is strictly external references. LIRS does not count itself as a reference,
+  // because it has a separate state field to track this. So, if there are no external
+  // references, this is zero. In order for an entry to be evicted or freed, the
+  // ref_count must be zero.
+  uint16_t ref_count;
+  LIRSState state;
+  DataResidency residency;
+};
+static_assert(sizeof(AtomicState) == 4, "AtomicState has unexpected size");
+
+// Struct to represent a successful atomic transition with the before and after values
+struct AtomicStateTransition {
+  AtomicStateTransition(AtomicState x, AtomicState y)
+    : before(x), after(y) {}
+  AtomicState before;
+  AtomicState after;
+};
+static_assert(sizeof(AtomicStateTransition) == 8,
+              "AtomicStateTransition has unexpected size");
+
+class LIRSHandle : public HandleBase {
+public:
+  LIRSHandle(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len, int charge)
+    : HandleBase(kv_ptr, key, hash, val_len, charge) {
+    AtomicState init_state;
+    init_state.ref_count = 0;
+    init_state.state = UNINITIALIZED;
+    init_state.residency = NOT_RESIDENT;
+    atomic_state_.store(init_state);
+  }
+
+  ~LIRSHandle() {
+    AtomicState destruct_state = atomic_state_.load();
+    DCHECK_EQ(destruct_state.state, UNINITIALIZED);
+    DCHECK_EQ(destruct_state.ref_count, 0);
+    DCHECK_EQ(destruct_state.residency, NOT_RESIDENT);
+  }
+
+  Cache::EvictionCallback* eviction_callback_;
+  // Recency list double-link
+  list_member_hook<> recency_list_hook_;
+  // Unprotected list double-link
+  list_member_hook<> unprotected_list_hook_;
+
+  // The boost functionality is the same as the std library functionality. Using boost
+  // solves a linking issue when using UBSAN with dynamic linking.
+  // TODO: Switch back to std library atomics when this works.
+  boost::atomic<AtomicState> atomic_state_;
+
+  // Since compare and swap can fail when there are concurrent modifications, this
+  // wraps some of the boilerplate compare and swap retry logic. This performs
+  // the provided lambda in a loop until the compare and swap succeeds.
+  // The lambda is required to mutate the AtomicState argument passed to it.
+  // This returns a structure containing the old atomic value and new atomic value.
+  AtomicStateTransition modify_atomic_state(std::function<void(AtomicState&)> fn) {
+    AtomicState old_atomic_state = atomic_state_.load();
+    AtomicState new_atomic_state = old_atomic_state;
+    while (true) {
+      new_atomic_state = old_atomic_state;
+      // fn mutates new_atomic_state and must change something
+      fn(new_atomic_state);
+      DCHECK(old_atomic_state.ref_count != new_atomic_state.ref_count ||
+             old_atomic_state.state != new_atomic_state.state ||
+             old_atomic_state.residency != new_atomic_state.residency);
+      if (atomic_state_.compare_exchange_weak(old_atomic_state, new_atomic_state)) {
+        break;
+      }
+    }
+    return AtomicStateTransition(old_atomic_state, new_atomic_state);
+  }
+
+  LIRSState state() const {
+    return atomic_state_.load().state;
+  }
+
+  // The caller must be holding the mutex_.
+  void set_state(LIRSState old_state, LIRSState new_state) {
+    modify_atomic_state([old_state, new_state](AtomicState& cur_state) {
+        DCHECK_EQ(cur_state.state, old_state);
+        cur_state.state = new_state;
+      });
+  }
+
+  DataResidency residency() const {
+    return atomic_state_.load().residency;
+  }
+
+  void set_resident() {
+    modify_atomic_state([](AtomicState& cur_state) {
+        DCHECK_EQ(cur_state.residency, NOT_RESIDENT);
+        cur_state.residency = RESIDENT;
+      });
+  }
+
+  uint16_t num_references() const {
+    return atomic_state_.load().ref_count;
+  }
+
+  // The caller must be holding the mutex_.
+  void get_reference() {
+    modify_atomic_state([](AtomicState& cur_state) {
+        // Check whether the reference count would wrap. There is no use case that
+        // currently needs to exceed 65k concurrent references, and this is likely
+        // to be a bug in the caller's code. For now, this asserts, but there may be
+        // a better behavior if needed.
+        CHECK_LT(cur_state.ref_count, std::numeric_limits<uint16_t>::max());
+        ++cur_state.ref_count;
+      });
+  }
+};
+
+struct LIRSThreadState {
+  // Handles to evict (must already be in TOMBSTONE or UNINITIALIZED state), because
+  // eviction is done without holding a mutex.
+  vector<LIRSHandle*> handles_to_evict;
+  // Handles to free (must already be in the UNINITIALIZED state and removed from
+  // structures).
+  vector<LIRSHandle*> handles_to_free;
+};
+
+// Boost intrusive lists allow the handle to easily be on multiple lists simultaneously.
+// Each hook is equivalent to a prev/next pointer.
+typedef member_hook<LIRSHandle, list_member_hook<>,
+                    &LIRSHandle::recency_list_hook_> RecencyListOption;
+typedef boost::intrusive::list<LIRSHandle, RecencyListOption> RecencyList;
+
+typedef member_hook<LIRSHandle, list_member_hook<>,
+                    &LIRSHandle::unprotected_list_hook_> UnprotectedListOption;
+typedef boost::intrusive::list<LIRSHandle, UnprotectedListOption> UnprotectedList;
+
+// A single shard of sharded cache.
+class LIRSCacheShard : public CacheShard {
+ public:
+  explicit LIRSCacheShard(kudu::MemTracker* tracker, size_t capacity);
+  ~LIRSCacheShard();
+
+  Status Init() override;
+  HandleBase* Allocate(Slice key, uint32_t hash, int val_len, int charge) override;
+  void Free(HandleBase* handle) override;
+  HandleBase* Insert(HandleBase* handle,
+      Cache::EvictionCallback* eviction_callback) override;
+  HandleBase* Lookup(const Slice& key, uint32_t hash, bool no_updates) override;
+  void Release(HandleBase* handle) override;
+  void Erase(const Slice& key, uint32_t hash) override;
+  size_t Invalidate(const Cache::InvalidationControl& ctl) override;
+
+ private:
+
+  // Update the memtracker's consumption by the given amount.
+  //
+  // This "buffers" the updates locally in 'deferred_consumption_' until the amount
+  // of accumulated delta is more than ~1% of the cache capacity. This improves
+  // performance under workloads with high eviction rates for a few reasons:
+  //
+  // 1) once the cache reaches its full capacity, we expect it to remain there
+  // in steady state. Each insertion is usually matched by an eviction, and unless
+  // the total size of the evicted item(s) is much different than the size of the
+  // inserted item, each eviction event is unlikely to change the total cache usage
+  // much. So, we expect that the accumulated error will mostly remain around 0
+  // and we can avoid propagating changes to the MemTracker at all.
+  //
+  // 2) because the cache implementation is sharded, we do this tracking in a bunch
+  // of different locations, avoiding bouncing cache-lines between cores. By contrast
+  // the MemTracker is a simple integer, so it doesn't scale as well under concurrency.
+  //
+  // Positive delta indicates an increased memory consumption.
+  void UpdateMemTracker(int64_t delta);
+
+  // MoveToRecencyListBack takes an entry and makes it the most recent entry in the
+  // recency list.
+  void MoveToRecencyListBack(LIRSThreadState* tstate, LIRSHandle* e,
+      bool expected_in_list=true);
+
+  // TrimRecencyList maintains the invariant that the oldest element in the recency list
+  // is a protected entry. This iterates the recency list from oldest to newest, removing
+  // entries until the oldest entry is a PROTECTED entry.
+  void TrimRecencyList(LIRSThreadState* tstate);
+
+  // This iterates the recency list from oldest to newest removing TOMBSTONE entires
+  // until the number of TOMBSTONE entries is less than the limit specified by
+  // lirs_tombstone_multiple.
+  void EnforceTombstoneLimit(LIRSThreadState* tstate);
+
+  // This enforces the capacity constraint on the PROTECTED entries. As long as the
+  // protected usage exceeds the protected capacity, it removes the oldest PROTECTED
+  // entry from the recency list and transitions it to an UNPROTECTED entry.
+  void EnforceProtectedCapacity(LIRSThreadState* tstate);
+
+  // This enforces the capacity constraint on the UNPROTECTED entries. As long as the
+  // unprotected usage exceeds the unprotected capacity, it removes the oldest
+  // UNPROTECTED entry from the unprotected list. The removed entry becomes
+  // a TOMBSTONE entry or an UNINITIALIZED entry.
+  void EnforceUnprotectedCapacity(LIRSThreadState* tstate);
+
+  // Enter the cache
+  // UninitializedToProtected always succeeds
+  void UninitializedToProtected(LIRSThreadState* tstate, LIRSHandle* e);
+  // UninitializedToUnprotected can fail when the entry is too large. Returns whether the
+  // entry was succcessfully added.
+  bool UninitializedToUnprotected(LIRSThreadState* tstate, LIRSHandle* e);
+
+  // Bounce around the cache
+  void UnprotectedToProtected(LIRSThreadState* tstate, LIRSHandle* e);
+  void ProtectedToUnprotected(LIRSThreadState* tstate, LIRSHandle* e);
+  void UnprotectedToTombstone(LIRSThreadState* tstate, LIRSHandle* e);
+
+  // Exit the cache
+  void ToUninitialized(LIRSThreadState* tstate, LIRSHandle* e);
+
+  // Functions move evictions and frees outside the critical section for mutex_ by
+  // placing the affected entries on the LIRSThreadState. This function handles the
+  // accumulated evictions/frees. It should be called without holding any locks.
+  void CleanupThreadState(LIRSThreadState* tstate);
+
+  bool initialized_ = false;
+
+  // Capacity is split between protected and unprotected entities. Unprotected entities
+  // make up a much smaller proportion of the cache than protected.
+  size_t capacity_;
+  size_t unprotected_capacity_;
+  size_t unprotected_usage_ = 0;
+  size_t num_unprotected_ = 0;
+  size_t protected_capacity_;
+  size_t protected_usage_ = 0;
+  size_t num_protected_ = 0;
+  size_t num_tombstones_ = 0;
+
+  // mutex_ protects the following state.
+  MutexType mutex_;
+
+  // Recency list (called the LIRS stack in the paper)
+  RecencyList recency_list_;
+
+  // Unprotected list (called the HIR list in the paper)
+  UnprotectedList unprotected_list_;
+
+  // Hash table that maps keys to handles.
+  HandleTable table_;
+
+  kudu::MemTracker* mem_tracker_;
+  std::atomic<int64_t> deferred_consumption_ { 0 };
+
+  // Initialized based on capacity_ to ensure an upper bound on the error on the
+  // MemTracker consumption.
+  int64_t max_deferred_consumption_;
+};
+
+LIRSCacheShard::LIRSCacheShard(kudu::MemTracker* tracker, size_t capacity)
+  : capacity_(capacity), mem_tracker_(tracker) {}
+
+LIRSCacheShard::~LIRSCacheShard() {
+  LIRSThreadState tstate;
+  // Start with the recency list.
+  while (!recency_list_.empty()) {
+    LIRSHandle* e = &*recency_list_.begin();
+    DCHECK_NE(e->state(), UNINITIALIZED);
+    DCHECK_EQ(e->num_references(), 0);
+    // This removes it from the recency list and the unprotected list (if appropriate)
+    ToUninitialized(&tstate, e);
+  }
+  // Anything remaining was not on the recency list. This progresses
+  while (!unprotected_list_.empty()) {
+    LIRSHandle* e = &*unprotected_list_.begin();
+    DCHECK_EQ(e->state(), UNPROTECTED);
+    DCHECK_EQ(e->num_references(), 0);
+    // This removes it from the unprotected list
+    ToUninitialized(&tstate, e);
+  }
+  CleanupThreadState(&tstate);
+  // Eviction calls UpdateMemTracker() for each handle, but UpdateMemTracker() only
+  // updates the underlying mem_tracker_ if the deferred consumption exceeds the
+  // max deferred consumption. There can be deferred consumption left over here,
+  // so decrement any remaining deferred consumption.
+  mem_tracker_->Consume(deferred_consumption_);
+}
+
+Status LIRSCacheShard::Init() {
+  // To save some implementation complexity, avoid dealing with unprotected
+  // percentage >= 50.00%. This is far outside the range of what is useful for
+  // LIRS.
+  if (!MathLimits<double>::IsFinite(FLAGS_lirs_unprotected_percentage) ||
+      FLAGS_lirs_unprotected_percentage >= 50.0 ||
+      FLAGS_lirs_unprotected_percentage < 0.0) {
+    return Status(Substitute("Misconfigured --lirs_unprotected_percentage: $0. "
+        "Must be between 0 and 50.", FLAGS_lirs_unprotected_percentage));
+  }
+  if (!MathLimits<double>::IsFinite(FLAGS_cache_memtracker_approximation_ratio) ||
+      FLAGS_cache_memtracker_approximation_ratio < 0.0 ||
+      FLAGS_cache_memtracker_approximation_ratio > 1.0) {
+    return Status(Substitute("Misconfigured --cache_memtracker_approximation_ratio: $0. "
+        "Must be between 0 and 1.", FLAGS_cache_memtracker_approximation_ratio));
+  }
+  if (!MathLimits<double>::IsFinite(FLAGS_lirs_tombstone_multiple) ||
+      FLAGS_lirs_tombstone_multiple < 0.5) {
+    return Status(Substitute("Misconfigured --lirs_tombstone_multiple: $0. "
+        "Must be 0.5 or greater.", FLAGS_lirs_tombstone_multiple));
+  }
+  unprotected_capacity_ =
+      static_cast<size_t>((capacity_ * FLAGS_lirs_unprotected_percentage) / 100.00);
+  protected_capacity_ = capacity_ - unprotected_capacity_;
+  CHECK_GE(protected_capacity_, unprotected_capacity_);
+  max_deferred_consumption_ = capacity_ * FLAGS_cache_memtracker_approximation_ratio;
+  initialized_ = true;
+  return Status::OK();
+}
+
+void LIRSCacheShard::UpdateMemTracker(int64_t delta) {
+  int64_t old_deferred = deferred_consumption_.fetch_add(delta);
+  int64_t new_deferred = old_deferred + delta;
+
+  if (new_deferred > max_deferred_consumption_ ||
+      new_deferred < -max_deferred_consumption_) {
+    int64_t to_propagate = deferred_consumption_.exchange(0, std::memory_order_relaxed);
+    mem_tracker_->Consume(to_propagate);
+  }
+}
+
+void LIRSCacheShard::EnforceProtectedCapacity(LIRSThreadState* tstate) {
+  while (protected_usage_ > protected_capacity_) {
+    DCHECK(!recency_list_.empty());
+    // Get pointer to oldest entry and remove it
+    LIRSHandle* oldest = &*recency_list_.begin();
+    // The oldest entry must be protected (i.e. the recency list must be trimmed)
+    DCHECK_EQ(oldest->state(), PROTECTED);
+    ProtectedToUnprotected(tstate, oldest);
+  }
+}
+
+void LIRSCacheShard::TrimRecencyList(LIRSThreadState* tstate) {
+  // This function maintains the invariant that the oldest entry in the list must be
+  // a protected entry. Look at the oldest entry in the list (i.e. the front). If it is
+  // not protected, remove it from the list. If it is a tombstone entry, it needs to be
+  // deleted. Unprotected entries still exist in the unprotected list, so UNPROTECTED
+  // entries should only be removed from the recency list.
+  while (!recency_list_.empty() && recency_list_.front().state() != PROTECTED) {
+    LIRSHandle* oldest = &*recency_list_.begin();
+    if (oldest->state() == TOMBSTONE) {
+      ToUninitialized(tstate, oldest);
+    } else {
+      DCHECK_EQ(oldest->state(), UNPROTECTED);
+      recency_list_.pop_front();
+    }
+  }
+}
+
+void LIRSCacheShard::EnforceTombstoneLimit(LIRSThreadState* tstate) {
+  // If there are a large number of entries that haven't been seen before, the number
+  // of tombstones can grow without bound. This enforces an upper bound on the total
+  // number of tombstones to limit the metadata size. This is defined as a multiple of
+  // the total number of entries in the cache.
+  // TODO: It would help performance to enforce this with some inexactness (i.e. batch
+  // the removals).
+  size_t num_elems = num_unprotected_ + num_protected_;
+  size_t tombstone_limit =
+    static_cast<size_t>(static_cast<double>(num_elems) * FLAGS_lirs_tombstone_multiple);
+  if (num_tombstones_ <= tombstone_limit) return;
+  // Start with the oldest entry in the recency list and iterate towards more recent
+  // entries
+  auto it = recency_list_.begin();
+  while (it != recency_list_.end() && num_tombstones_ > tombstone_limit) {
+    LIRSHandle* e = &*it;
+    if (e->state() == TOMBSTONE) {
+      it = recency_list_.erase(it);
+      ToUninitialized(tstate, e);
+    } else {
+      ++it;
+    }
+  }
+}
+
+void LIRSCacheShard::EnforceUnprotectedCapacity(LIRSThreadState* tstate) {
+  while (unprotected_usage_ > unprotected_capacity_) {
+    // Evict the oldest UNPROTECTED entry from the unprotected list. This transitions it
+    // from UNPROTECTED to a TOMBSTONE entry
+    DCHECK(!unprotected_list_.empty());
+    LIRSHandle* oldest = &*unprotected_list_.begin();
+    if (oldest->recency_list_hook_.is_linked()) {
+      UnprotectedToTombstone(tstate, oldest);
+    } else {
+      // If the oldest entry is not on the recency list, then its reuse distance is
+      // longer than the oldest entry on the recency list. This entry should be deleted
+      // rather than becoming a TOMBSTONE.
+      ToUninitialized(tstate, oldest);
+    }
+  }
+}
+
+void LIRSCacheShard::MoveToRecencyListBack(LIRSThreadState* tstate, LIRSHandle *e,
+    bool expected_in_list) {
+  // Remove and add to the back of the list as the most recent element
+  bool in_list = e->recency_list_hook_.is_linked();
+  CHECK(!expected_in_list || in_list);
+  bool need_trim = false;
+  if (in_list) {
+    // Is this the oldest entry in the list (the front)?
+    LIRSHandle* oldest = &*recency_list_.begin();
+    // Invariant: the oldest entry in the list is always a protected entry
+    CHECK_EQ(oldest->state(), PROTECTED);
+    if (oldest == e) {
+      need_trim = true;
+    }
+    recency_list_.erase(recency_list_.iterator_to(*e));
+  }
+  recency_list_.push_back(*e);
+  if (need_trim) {
+    TrimRecencyList(tstate);
+  }
+}
+
+void LIRSCacheShard::UnprotectedToProtected(LIRSThreadState* tstate, LIRSHandle *e) {
+  // This only happens for entries that are only the recency list
+  DCHECK(e->recency_list_hook_.is_linked());
+  // Make this the most recent entry in the recency list
+  MoveToRecencyListBack(tstate, e);
+  // Remove from unprotected list, decrement unprotected usage
+  unprotected_usage_ -= e->charge();
+  --num_unprotected_;
+  ++num_protected_;
+  DCHECK(e->unprotected_list_hook_.is_linked());
+  unprotected_list_.erase(unprotected_list_.iterator_to(*e));
+  e->set_state(UNPROTECTED, PROTECTED);
+  // Increment protected usage
+  protected_usage_ += e->charge();
+  // If necessary, evict old entries from the recency list, protected to unprotected
+  EnforceProtectedCapacity(tstate);
+}
+
+void LIRSCacheShard::ProtectedToUnprotected(LIRSThreadState* tstate, LIRSHandle* e) {
+  DCHECK(!e->unprotected_list_hook_.is_linked());
+  DCHECK(e->recency_list_hook_.is_linked());
+  recency_list_.erase(recency_list_.iterator_to(*e));
+  // Trim the recency list after the removal
+  TrimRecencyList(tstate);
+  // Decrement protected usage
+  protected_usage_ -= e->charge();
+  e->set_state(PROTECTED, UNPROTECTED);
+  // Add to unprotected list as the newest entry
+  unprotected_list_.push_back(*e);
+  // Increment unprotected usage
+  unprotected_usage_ += e->charge();
+  --num_protected_;
+  ++num_unprotected_;
+  // If necessary, evict an entry from the unprotected list
+  EnforceUnprotectedCapacity(tstate);
+}
+
+void LIRSCacheShard::UnprotectedToTombstone(LIRSThreadState* tstate, LIRSHandle* e) {
+  // Decrement unprotected usage
+  DCHECK(e->unprotected_list_hook_.is_linked());
+  DCHECK(e->recency_list_hook_.is_linked());
+  unprotected_list_.erase(unprotected_list_.iterator_to(*e));
+  unprotected_usage_ -= e->charge();
+  --num_unprotected_;
+  ++num_tombstones_;
+  // Set state to TOMBSTONE. If there are currently no references, this thread is
+  // responsible for eviction, so it must also set the residency to EVICTING.
+  auto state_transition = e->modify_atomic_state([](AtomicState& cur_state) {
+      DCHECK_EQ(cur_state.state, UNPROTECTED);
+      DCHECK_EQ(cur_state.residency, RESIDENT);
+      cur_state.state = TOMBSTONE;
+      if (cur_state.ref_count == 0) cur_state.residency = EVICTING;
+    });
+  // Look at the old state. If there are no references, then we can immediately evict.
+  if (state_transition.before.ref_count == 0) {
+    tstate->handles_to_evict.push_back(e);
+  }
+  EnforceTombstoneLimit(tstate);
+}
+
+void LIRSCacheShard::UninitializedToProtected(LIRSThreadState* tstate, LIRSHandle* e) {
+  DCHECK(!e->unprotected_list_hook_.is_linked());
+  DCHECK(!e->recency_list_hook_.is_linked());
+  e->set_state(UNINITIALIZED, PROTECTED);
+  HandleBase* existing = table_.Insert(e);
+  DCHECK(existing == nullptr);
+  MoveToRecencyListBack(tstate, e, false);
+  protected_usage_ += e->charge();
+  ++num_protected_;
+  EnforceProtectedCapacity(tstate);
+}
+
+bool LIRSCacheShard::UninitializedToUnprotected(LIRSThreadState* tstate, LIRSHandle* e) {
+  DCHECK(!e->unprotected_list_hook_.is_linked());
+  DCHECK(!e->recency_list_hook_.is_linked());
+  e->set_state(UNINITIALIZED, UNPROTECTED);
+  HandleBase* existing = table_.Insert(e);
+  DCHECK(existing == nullptr);
+  recency_list_.push_back(*e);
+  unprotected_usage_ += e->charge();
+  ++num_unprotected_;
+  unprotected_list_.push_back(*e);
+  // If necessary, evict an entry from the unprotected list
+  EnforceUnprotectedCapacity(tstate);
+  // There is exactly one failure case:
+  // If the new entry is larger than the unprotected capacity, then it will cause all
+  // unprotected entries to be removed (including itself).
+  if (e->charge() > unprotected_capacity_) {
+    DCHECK(e->state() == UNINITIALIZED || e->state() == TOMBSTONE);
+    DCHECK(unprotected_list_.empty());
+    return false;
+  }
+  // The entry remains in the cache
+  DCHECK_EQ(e->state(), UNPROTECTED);
+  DCHECK(!unprotected_list_.empty());
+  DCHECK(e->unprotected_list_hook_.is_linked());
+  return true;
+}
+
+void LIRSCacheShard::ToUninitialized(LIRSThreadState* tstate, LIRSHandle* e) {
+  DCHECK_NE(e->state(), UNINITIALIZED);
+  LIRSHandle* removed_elem = static_cast<LIRSHandle*>(table_.Remove(e->key(), e->hash()));
+  DCHECK(e == removed_elem || removed_elem == nullptr);
+  if (e->state() == UNPROTECTED) {
+    // Remove from the unprotected list and decrement usage
+    unprotected_list_.erase(unprotected_list_.iterator_to(*e));
+    unprotected_usage_ -= e->charge();
+    --num_unprotected_;
+  } else {
+    DCHECK(!e->unprotected_list_hook_.is_linked());
+  }
+  // Remove from the list (if it is in the list)
+  if (e->recency_list_hook_.is_linked()) {
+    recency_list_.erase(recency_list_.iterator_to(*e));
+  }
+  if (e->state() == PROTECTED) {
+    protected_usage_ -= e->charge();
+    --num_protected_;
+  }
+  if (e->state() == TOMBSTONE) {
+    --num_tombstones_;
+  }
+
+  auto state_transition = e->modify_atomic_state([](AtomicState& cur_state) {
+      cur_state.state = UNINITIALIZED;
+      // If it is not resident, there must be no references.
+      if (cur_state.residency == NOT_RESIDENT || cur_state.residency == EVICTING) {
+        DCHECK_EQ(cur_state.ref_count, 0);
+      }
+      // If this is resident without references, we can start eviction.
+      if (cur_state.residency == RESIDENT && cur_state.ref_count == 0) {
+        cur_state.residency = EVICTING;
+      }
+    });
+
+  if (state_transition.before.residency == RESIDENT &&
+      state_transition.after.residency == EVICTING) {
+    // We did a transition from RESIDENT to EVICTING, so we are responsible for
+    // eviction. Since we know the new state is UNINITIALIZED, we are also responsible
+    // for freeing it, but the eviction code will handle that appropriately.
+    tstate->handles_to_evict.push_back(e);
+  }
+
+  // This was already evicted, so now it just needs to be freed.
+  if (state_transition.before.residency == NOT_RESIDENT) {
+    tstate->handles_to_free.push_back(e);
+  }
+}
+
+HandleBase* LIRSCacheShard::Allocate(Slice key, uint32_t hash, int val_len, int charge) {
+  DCHECK(initialized_);
+  int key_len = key.size();
+  DCHECK_GE(key_len, 0);
+  DCHECK_GE(val_len, 0);
+  DCHECK_GT(charge, 0);
+  if (charge == 0 || charge > protected_capacity_) {
+    return nullptr;
+  }
+  int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
+  uint8_t* buf = new uint8_t[sizeof(LIRSHandle)
+                             + key_len_padded + val_len]; // the kv_data VLA data
+  int calc_charge =
+    (charge == Cache::kAutomaticCharge) ? kudu::kudu_malloc_usable_size(buf) : charge;
+  uint8_t* kv_ptr = buf + sizeof(LIRSHandle);
+  LIRSHandle* handle = new (buf) LIRSHandle(kv_ptr, key, hash, val_len,
+      calc_charge);
+  return handle;
+}
+
+void LIRSCacheShard::Free(HandleBase* handle) {
+  DCHECK(initialized_);
+  // We allocate the handle as a uint8_t array, then we call a placement new,
+  // which calls the constructor. For symmetry, we call the destructor and then
+  // delete on the uint8_t array.
+  LIRSHandle* h = static_cast<LIRSHandle*>(handle);
+  h->~LIRSHandle();
+  uint8_t* data = reinterpret_cast<uint8_t*>(handle);
+  delete [] data;
+}
+
+HandleBase* LIRSCacheShard::Lookup(const Slice& key, uint32_t hash,
+    bool no_updates) {
+  DCHECK(initialized_);
+  LIRSHandle* e;
+  LIRSThreadState tstate;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = static_cast<LIRSHandle*>(table_.Lookup(key, hash));
+    if (e != nullptr) {
+      CHECK_NE(e->state(), UNINITIALIZED);
+      // If the handle is a TOMBSTONE, Lookup() should pretend the entry doesn't exist.
+      // TOMBSTONE has special treatment in Insert(); no other action is necessary here.
+      if (e->state() == TOMBSTONE) return nullptr;
+      e->get_reference();
+      // If this is a no update lookup, nothing has changed. We can just return the
+      // entry.
+      if (no_updates) return e;
+      switch (e->state()) {
+      case UNINITIALIZED:
+        // Needed to keep clang-tidy happy
+        CHECK(false);
+      case PROTECTED:
+        // The PROTECTED entry is in the recency list, and the only action is to make
+        // it the most recent element in the list. This can result in evictions if it is
+        // currently the oldest entry.
+        MoveToRecencyListBack(&tstate, e);
+        break;
+      case UNPROTECTED:
+        if (e->recency_list_hook_.is_linked()) {
+          // If an UNPROTECTED entry is in the recency list when it is referenced,
+          // then we know that its new reuse distance is shorter than the last PROTECTED
+          // entry in the list. So, this entry is upgraded from UNPROTECTED to
+          // PROTECTED.
+          UnprotectedToProtected(&tstate, e);
+        } else {
+          // If an UNPROTECTED entry is not on the recency list, then its new reuse
+          // distance is longer than any of the PROTECTED entries. So, it remains
+          // an UNPROTECTED entry, but it should be added back to the recency list and
+          // readded as the most recent entry on the unprotected list.
+          MoveToRecencyListBack(&tstate, e, false);
+          unprotected_list_.erase(unprotected_list_.iterator_to(*e));
+          unprotected_list_.push_back(*e);
+        }
+        break;
+      default:
+        CHECK(false);
+      }
+    }
+  }
+
+  // This happens when not holding the mutex for performance reasons.
+  CleanupThreadState(&tstate);
+
+  return e;
+}
+
+void LIRSCacheShard::Release(HandleBase* handle) {
+  DCHECK(initialized_);
+  LIRSHandle* e = static_cast<LIRSHandle*>(handle);
+  LIRSThreadState tstate;
+
+  auto state_transition = e->modify_atomic_state([] (AtomicState& cur_state) {
+      DCHECK_GT(cur_state.ref_count, 0);
+      --cur_state.ref_count;
+      if (cur_state.ref_count == 0) {
+        if (cur_state.state == TOMBSTONE || cur_state.state == UNINITIALIZED) {
+          // An entry cannot be evicted until the reference count is zero, so this
+          // must still be resident. In this circumstance, this thread must do the
+          // eviction.
+          DCHECK_EQ(cur_state.residency, RESIDENT);
+          cur_state.residency = EVICTING;
+        }
+      }
+    });
+  // If we set the residency to EVICTING, then do the eviction. If this is UNINITIALIZED,
+  // this will also free it.
+  if (state_transition.after.residency == EVICTING) {
+    tstate.handles_to_evict.push_back(e);
+    CleanupThreadState(&tstate);
+  }
+}
+
+HandleBase* LIRSCacheShard::Insert(HandleBase* e_in,
+    Cache::EvictionCallback *eviction_callback) {
+  DCHECK(initialized_);
+  LIRSHandle* e = static_cast<LIRSHandle*>(e_in);
+  CHECK_EQ(e->state(), UNINITIALIZED);
+  // Set the remaining LIRSHandle members which were not already set in the constructor.
+  e->eviction_callback_ = eviction_callback;
+  // The caller has already stored the value in the handle (and any underlying storage).
+  // Whether the Insert() is ultimately successful or not, this handle is currently
+  // resident.
+  e->set_resident();
+
+  // Insert() can fail. A failed insert is equivalent to a successful insert followed
+  // by an immediate eviction of that entry, so much of the setup code is identical.
+  bool success = true;
+  LIRSThreadState tstate;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+
+    // Cases:
+    // 1. There is no existing entry.
+    // 2. There is a tombstone entry.
+    // 3. There is a non-tombstone entry (rare)
+    // In any case, the existing entry will be replaced, so go ahead and remove it.
+    LIRSHandle* existing_entry =
+        static_cast<LIRSHandle*>(table_.Remove(e->key(), e->hash()));
+
+    // The entry is always resident at the start of Insert(), so incorporate this
+    // entry's charge into the memtracker. If Insert() fails, this is decremented as
+    // part of eviction.
+    UpdateMemTracker(e->charge());
+    // All of these paths can result in evictions
+    if (existing_entry != nullptr) {
+      // Priorities are modified in Lookup, but priorities are not changed in Insert.
+      // The exception is TOMBSTONE entries, which are not impacted by Lookup.
+      // So:
+      // 1. TOMBSTONE becomes PROTECTED.
+      // 2. PROTECTED remains PROTECTED.
+      // 3. UNPROTECTED remains UNPROTECTED.
+      LIRSState existing_state = existing_entry->state();
+      ToUninitialized(&tstate, existing_entry);
+      if (existing_state == PROTECTED || existing_state == TOMBSTONE) {
+        UninitializedToProtected(&tstate, e);
+      } else {
+        DCHECK_EQ(existing_state, UNPROTECTED);
+        // UninitializedToUnprotected can fail (i.e. the entry inserted may not remain
+        // in the cache).
+        success = UninitializedToUnprotected(&tstate, e);
+      }
+    } else if (protected_usage_ + e->charge() <= protected_capacity_) {
+      // There is available space in the protected area, so add it directly there.
+      UninitializedToProtected(&tstate, e);
+    } else {
+      // UninitializedToUnprotected can fail (i.e. the entry inserted may not remain in
+      // the cache).
+      success = UninitializedToUnprotected(&tstate, e);
+    }
+    // If success=false, the entry is scheduled for eviction and won't be returned.
+    if (success) {
+      // Increment reference, since this will be returned.
+      e->get_reference();
+    }
+  }
+
+  // This happens when not holding the mutex for performance reasons.
+  CleanupThreadState(&tstate);
+
+  return (success ? e : nullptr);
+}
+
+void LIRSCacheShard::CleanupThreadState(LIRSThreadState* tstate) {
+  // evict things that need to be evicted
+  for (LIRSHandle* e : tstate->handles_to_evict) {
+    DCHECK_EQ(e->residency(), EVICTING);
+    DCHECK_EQ(e->num_references(), 0);
+    if (e->eviction_callback_) {
+      e->eviction_callback_->EvictedEntry(e->key(), e->value());
+    }
+    UpdateMemTracker(-static_cast<int64_t>(e->charge()));
+    // The eviction is done, set the residency back to NOT_RESIDENT
+    auto state_transition = e->modify_atomic_state([] (AtomicState& cur_state) {
+        DCHECK_EQ(cur_state.residency, EVICTING);
+        DCHECK_EQ(cur_state.ref_count, 0);
+        cur_state.residency = NOT_RESIDENT;
+      });
+    // If the handle transitioned to UNINITIALIZED before we set it to NOT_RESIDENT,
+    // then we are responsible for freeing it.
+    if (state_transition.before.state == UNINITIALIZED) {
+      Free(e);
+    }
+  }
+  tstate->handles_to_evict.clear();
+  for (LIRSHandle* e : tstate->handles_to_free) {
+    Free(e);
+  }
+  tstate->handles_to_free.clear();
+}
+
+void LIRSCacheShard::Erase(const Slice& key, uint32_t hash) {
+  DCHECK(initialized_);
+  LIRSThreadState tstate;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    LIRSHandle* e = static_cast<LIRSHandle*>(table_.Remove(key, hash));
+    if (e != nullptr) {
+      // Transition from any state to UNINITIALIZED. There may be external references, so
+      // this might not actually free it immediately.
+      ToUninitialized(&tstate, e);
+    }
+  }
+
+  // This happens when not holding the mutex for performance reasons.
+  CleanupThreadState(&tstate);
+}
+
+size_t LIRSCacheShard::Invalidate(const Cache::InvalidationControl& ctl) {
+  DCHECK(initialized_);
+  DCHECK(false) << "Invalidate() is not implemented for LIRS";
+  return 0;
+}
+
+}  // end anonymous namespace
+
+template<>
+CacheShard* NewCacheShardInt<Cache::EvictionPolicy::LIRS>(kudu::MemTracker* mem_tracker,
+    size_t capacity) {
+  return new LIRSCacheShard(mem_tracker, capacity);
+}
+
+}  // namespace impala
diff --git a/be/src/util/cache/rl-cache-test.cc b/be/src/util/cache/rl-cache-test.cc
new file mode 100644
index 0000000..ec6d48c
--- /dev/null
+++ b/be/src/util/cache/rl-cache-test.cc
@@ -0,0 +1,258 @@
+// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "util/cache/cache.h"
+#include "util/cache/cache-test.h"
+
+#include <cstring>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/slice.h"
+
+using std::make_tuple;
+using std::tuple;
+using strings::Substitute;
+
+namespace impala {
+
+// The Invalidate operation is currently only supported by the RLCacheShard
+// implementation.
+class CacheInvalidationTest :
+    public CacheBaseTest,
+    public ::testing::WithParamInterface<tuple<Cache::EvictionPolicy, ShardingPolicy>> {
+ public:
+  CacheInvalidationTest()
+      : CacheBaseTest(16 * 1024 * 1024) {
+  }
+
+  void SetUp() override {
+    const auto& param = GetParam();
+    SetupWithParameters(std::get<0>(param),
+                        std::get<1>(param));
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(
+    CacheTypes, CacheInvalidationTest,
+    ::testing::Values(
+        make_tuple(Cache::EvictionPolicy::FIFO,
+                   ShardingPolicy::MultiShard),
+        make_tuple(Cache::EvictionPolicy::FIFO,
+                   ShardingPolicy::SingleShard),
+        make_tuple(Cache::EvictionPolicy::LRU,
+                   ShardingPolicy::MultiShard),
+        make_tuple(Cache::EvictionPolicy::LRU,
+                   ShardingPolicy::SingleShard)));
+
+TEST_P(CacheInvalidationTest, InvalidateAllEntries) {
+  constexpr const int kEntriesNum = 1024;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate({}));
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+  // Remove a few entries from the cache (sparse pattern of keys).
+  constexpr const int kSparseKeys[] = {1, 100, 101, 500, 501, 512, 999, 1001};
+  for (const auto key : kSparseKeys) {
+    Erase(key);
+  }
+  ASSERT_EQ(ARRAYSIZE(kSparseKeys), evicted_keys_.size());
+
+  // All inserted entries, except for the removed one, should be invalidated.
+  ASSERT_EQ(kEntriesNum - ARRAYSIZE(kSparseKeys), cache_->Invalidate({}));
+  // In the end, no entries should be left in the cache.
+  ASSERT_EQ(kEntriesNum, evicted_keys_.size());
+}
+
+TEST_P(CacheInvalidationTest, InvalidateNoEntries) {
+  constexpr const int kEntriesNum = 10;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  const Cache::ValidityFunc func = [](Slice /* key */, Slice /* value */) {
+    return true;
+  };
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate({ func }));
+
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+
+  // No entries should be invalidated since the validity function considers
+  // all entries valid.
+  ASSERT_EQ(0, cache_->Invalidate({ func }));
+  ASSERT_TRUE(evicted_keys_.empty());
+}
+
+TEST_P(CacheInvalidationTest, InvalidateNoEntriesNoAdvanceIterationFunctor) {
+  constexpr const int kEntriesNum = 256;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  const Cache::InvalidationControl ctl = {
+    Cache::kInvalidateAllEntriesFunc,
+    [](size_t /* valid_entries_count */, size_t /* invalid_entries_count */) {
+      // Never advance over the item list.
+      return false;
+    }
+  };
+
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate(ctl));
+
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+
+  // No entries should be invalidated since the iteration functor doesn't
+  // advance over the list of entries, even if every entry is declared invalid.
+  ASSERT_EQ(0, cache_->Invalidate(ctl));
+  // In the end, all entries should be in the cache.
+  ASSERT_EQ(0, evicted_keys_.size());
+}
+
+TEST_P(CacheInvalidationTest, InvalidateOddKeyEntries) {
+  constexpr const int kEntriesNum = 64;
+  // This scenarios assumes no evictions are done at the cache capacity.
+  ASSERT_LE(kEntriesNum, cache_size());
+
+  const Cache::ValidityFunc func = [](Slice key, Slice /* value */) {
+    return DecodeInt(key) % 2 == 0;
+  };
+  // Running invalidation on empty cache should yield no invalidated entries.
+  ASSERT_EQ(0, cache_->Invalidate({ func }));
+
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    Insert(i, i);
+  }
+  ASSERT_EQ(kEntriesNum / 2, cache_->Invalidate({ func }));
+  ASSERT_EQ(kEntriesNum / 2, evicted_keys_.size());
+  for (auto i = 0; i < kEntriesNum; ++i) {
+    if (i % 2 == 0) {
+      ASSERT_EQ(i,  Lookup(i));
+    } else {
+      ASSERT_EQ(-1,  Lookup(i));
+    }
+  }
+}
+
+// This class is dedicated for scenarios specific for FIFOCache.
+// The scenarios use a single-shard cache for simpler logic.
+class FIFOCacheTest : public CacheBaseTest {
+ public:
+  FIFOCacheTest()
+      : CacheBaseTest(10 * 1024) {
+  }
+
+  void SetUp() override {
+    SetupWithParameters(Cache::EvictionPolicy::FIFO,
+                        ShardingPolicy::SingleShard);
+  }
+};
+
+// Verify how the eviction behavior of a FIFO cache.
+TEST_F(FIFOCacheTest, EvictionPolicy) {
+  static constexpr int kNumElems = 20;
+  const int size_per_elem = cache_size() / kNumElems;
+  // First data chunk: fill the cache up to the capacity.
+  int idx = 0;
+  do {
+    Insert(idx, idx, size_per_elem);
+    // Keep looking up the very first entry: this is to make sure lookups
+    // do not affect the recency criteria of the eviction policy for FIFO cache.
+    Lookup(0);
+    ++idx;
+  } while (evicted_keys_.empty());
+  ASSERT_GT(idx, 1);
+
+  // Make sure the earliest inserted entry was evicted.
+  ASSERT_EQ(-1, Lookup(0));
+
+  // Verify that the 'empirical' capacity matches the expected capacity
+  // (it's a single-shard cache).
+  const int capacity = idx - 1;
+  ASSERT_EQ(kNumElems, capacity);
+
+  // Second data chunk: add (capacity / 2) more elements.
+  for (int i = 1; i < capacity / 2; ++i) {
+    // Earlier inserted elements should be gone one-by-one as new elements are
+    // inserted, and lookups should not affect the recency criteria of the FIFO
+    // eviction policy.
+    ASSERT_EQ(i, Lookup(i));
+    Insert(capacity + i, capacity + i, size_per_elem);
+    ASSERT_EQ(capacity + i, Lookup(capacity + i));
+    ASSERT_EQ(-1, Lookup(i));
+  }
+  ASSERT_EQ(capacity / 2, evicted_keys_.size());
+
+  // Early inserted elements from the first chunk should be evicted
+  // to accommodate the elements from the second chunk.
+  for (int i = 0; i < capacity / 2; ++i) {
+    SCOPED_TRACE(Substitute("early inserted elements: index $0", i));
+    ASSERT_EQ(-1, Lookup(i));
+  }
+  // The later inserted elements from the first chunk should be still
+  // in the cache.
+  for (int i = capacity / 2; i < capacity; ++i) {
+    SCOPED_TRACE(Substitute("late inserted elements: index $0", i));
+    ASSERT_EQ(i, Lookup(i));
+  }
+}
+
+class LRUCacheTest :
+    public CacheBaseTest,
+    public ::testing::WithParamInterface<ShardingPolicy> {
+ public:
+  LRUCacheTest()
+      : CacheBaseTest(16 * 1024 * 1024) {
+  }
+
+  void SetUp() override {
+    const auto& param = GetParam();
+    SetupWithParameters(Cache::EvictionPolicy::LRU,
+                        param);
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(
+    CacheTypes, LRUCacheTest,
+    ::testing::Values(ShardingPolicy::MultiShard,
+                      ShardingPolicy::SingleShard));
+
+TEST_P(LRUCacheTest, EvictionPolicy) {
+  static constexpr int kNumElems = 1000;
+  const int size_per_elem = cache_size() / kNumElems;
+
+  Insert(100, 101);
+  Insert(200, 201);
+
+  // Loop adding and looking up new entries, but repeatedly accessing key 100.
+  // This frequently-used entry should not be evicted. It also accesses key 200,
+  // but the lookup uses NO_UPDATE, so this key is not preserved.
+  for (int i = 0; i < kNumElems + 1000; ++i) {
+    Insert(1000+i, 2000+i, size_per_elem);
+    ASSERT_EQ(2000+i, Lookup(1000+i));
+    ASSERT_EQ(101, Lookup(100));
+    int entry200val = Lookup(200, Cache::NO_UPDATE);
+    ASSERT_TRUE(entry200val == -1 || entry200val == 201);
+  }
+  ASSERT_EQ(101, Lookup(100));
+  // Since '200' was accessed using NO_UPDATE in the loop above, it should have
+  // been evicted.
+  ASSERT_EQ(-1, Lookup(200));
+}
+
+}  // namespace impala
diff --git a/be/src/util/cache/cache.cc b/be/src/util/cache/rl-cache.cc
similarity index 81%
copy from be/src/util/cache/cache.cc
copy to be/src/util/cache/rl-cache.cc
index c9f97a7..30cfe4e 100644
--- a/be/src/util/cache/cache.cc
+++ b/be/src/util/cache/rl-cache.cc
@@ -17,14 +17,16 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
-#include "kudu/gutil/bits.h"
-#include "kudu/gutil/hash/city.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/sysinfo.h"
+#include "common/status.h"
+#include "gutil/bits.h"
+#include "gutil/hash/city.h"
+#include "gutil/macros.h"
+#include "gutil/mathlimits.h"
+#include "gutil/port.h"
+#include "gutil/ref_counted.h"
+#include "gutil/stl_util.h"
+#include "gutil/strings/substitute.h"
+#include "gutil/sysinfo.h"
 #include "kudu/util/alignment.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
@@ -33,9 +35,6 @@
 #include "kudu/util/slice.h"
 #include "util/cache/cache-internal.h"
 
-// Useful in tests that require accurate cache capacity accounting.
-DECLARE_bool(cache_force_single_shard);
-
 DECLARE_double(cache_memtracker_approximation_ratio);
 
 using std::atomic;
@@ -44,20 +43,9 @@ using std::string;
 using std::unique_ptr;
 using std::vector;
 
-namespace impala {
-
-Cache::~Cache() {
-}
+using strings::Substitute;
 
-const Cache::ValidityFunc Cache::kInvalidateAllEntriesFunc = [](
-    Slice /* key */, Slice /* value */) {
-  return false;
-};
-
-const Cache::IterationFunc Cache::kIterateOverAllEntriesFunc = [](
-    size_t /* valid_entries_num */, size_t /* invalid_entries_num */) {
-  return true;
-};
+namespace impala {
 
 namespace {
 
@@ -69,7 +57,12 @@ namespace {
 class RLHandle : public HandleBase {
 public:
   RLHandle(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len, int charge)
-    : HandleBase(kv_ptr, key, hash, val_len, charge) {}
+    : HandleBase(kv_ptr, key, hash, val_len, charge) {
+    refs.store(0);
+    next = nullptr;
+    prev = nullptr;
+    eviction_callback = nullptr;
+  }
 
   RLHandle()
     : HandleBase(nullptr, Slice(), 0, 0, 0) {}
@@ -88,15 +81,10 @@ class RLCacheShard : public CacheShard {
       "RLCacheShard only supports LRU or FIFO");
 
  public:
-  explicit RLCacheShard(kudu::MemTracker* tracker);
+  explicit RLCacheShard(kudu::MemTracker* tracker, size_t capacity);
   ~RLCacheShard();
 
-  // Separate from constructor so caller can easily make an array of CacheShard
-  void SetCapacity(size_t capacity) override {
-    capacity_ = capacity;
-    max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio;
-  }
-
+  Status Init() override;
   HandleBase* Allocate(Slice key, uint32_t hash, int val_len, int charge) override;
   void Free(HandleBase* handle) override;
   HandleBase* Insert(HandleBase* handle,
@@ -138,7 +126,8 @@ class RLCacheShard : public CacheShard {
   // Positive delta indicates an increased memory consumption.
   void UpdateMemTracker(int64_t delta);
 
-  // Initialized before use.
+  bool initialized_ = false;
+
   size_t capacity_;
 
   // mutex_ protects the following state.
@@ -160,8 +149,9 @@ class RLCacheShard : public CacheShard {
 };
 
 template<Cache::EvictionPolicy policy>
-RLCacheShard<policy>::RLCacheShard(kudu::MemTracker* tracker)
-  : usage_(0),
+RLCacheShard<policy>::RLCacheShard(kudu::MemTracker* tracker, size_t capacity)
+  : capacity_(capacity),
+    usage_(0),
     mem_tracker_(tracker) {
   // Make empty circular linked list.
   rl_.next = &rl_;
@@ -174,6 +164,7 @@ RLCacheShard<policy>::~RLCacheShard() {
     RLHandle* next = e->next;
     DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1)
         << "caller has an unreleased handle";
+    table_.Remove(e->key(), e->hash());
     if (Unref(e)) {
       FreeEntry(e);
     }
@@ -183,6 +174,19 @@ RLCacheShard<policy>::~RLCacheShard() {
 }
 
 template<Cache::EvictionPolicy policy>
+Status RLCacheShard<policy>::Init() {
+  if (!MathLimits<double>::IsFinite(FLAGS_cache_memtracker_approximation_ratio) ||
+      FLAGS_cache_memtracker_approximation_ratio < 0.0 ||
+      FLAGS_cache_memtracker_approximation_ratio > 1.0) {
+    return Status(Substitute("Misconfigured --cache_memtracker_approximation_ratio: $0. "
+        "Must be between 0 and 1.", FLAGS_cache_memtracker_approximation_ratio));
+  }
+  max_deferred_consumption_ = capacity_ * FLAGS_cache_memtracker_approximation_ratio;
+  initialized_ = true;
+  return Status::OK();
+}
+
+template<Cache::EvictionPolicy policy>
 bool RLCacheShard<policy>::Unref(RLHandle* e) {
   DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0);
   return e->refs.fetch_sub(1) == 1;
@@ -241,6 +245,7 @@ void RLCacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e)
 template<Cache::EvictionPolicy policy>
 HandleBase* RLCacheShard<policy>::Allocate(Slice key, uint32_t hash, int val_len,
     int charge) {
+  DCHECK(initialized_);
   int key_len = key.size();
   DCHECK_GE(key_len, 0);
   DCHECK_GE(val_len, 0);
@@ -260,6 +265,7 @@ HandleBase* RLCacheShard<policy>::Allocate(Slice key, uint32_t hash, int val_len
 
 template<Cache::EvictionPolicy policy>
 void RLCacheShard<policy>::Free(HandleBase* handle) {
+  DCHECK(initialized_);
   // We allocate the handle as a uint8_t array, then we call a placement new,
   // which calls the constructor. For symmetry, we call the destructor and then
   // delete on the uint8_t array.
@@ -272,14 +278,16 @@ void RLCacheShard<policy>::Free(HandleBase* handle) {
 template<Cache::EvictionPolicy policy>
 HandleBase* RLCacheShard<policy>::Lookup(const Slice& key,
                                          uint32_t hash,
-                                         bool caching) {
+                                         bool no_updates) {
+  DCHECK(initialized_);
   RLHandle* e;
   {
     std::lock_guard<decltype(mutex_)> l(mutex_);
     e = static_cast<RLHandle*>(table_.Lookup(key, hash));
     if (e != nullptr) {
       e->refs.fetch_add(1, std::memory_order_relaxed);
-      RL_UpdateAfterLookup(e);
+      // If this is a no update lookup, skip the modifications.
+      if (!no_updates) RL_UpdateAfterLookup(e);
     }
   }
 
@@ -288,6 +296,7 @@ HandleBase* RLCacheShard<policy>::Lookup(const Slice& key,
 
 template<Cache::EvictionPolicy policy>
 void RLCacheShard<policy>::Release(HandleBase* handle) {
+  DCHECK(initialized_);
   RLHandle* e = static_cast<RLHandle*>(handle);
   bool last_reference = Unref(e);
   if (last_reference) {
@@ -299,6 +308,7 @@ template<Cache::EvictionPolicy policy>
 HandleBase* RLCacheShard<policy>::Insert(
     HandleBase* handle_in,
     Cache::EvictionCallback* eviction_callback) {
+  DCHECK(initialized_);
   RLHandle* handle = static_cast<RLHandle*>(handle_in);
   // Set the remaining RLHandle members which were not already allocated during
   // Allocate().
@@ -346,6 +356,7 @@ HandleBase* RLCacheShard<policy>::Insert(
 
 template<Cache::EvictionPolicy policy>
 void RLCacheShard<policy>::Erase(const Slice& key, uint32_t hash) {
+  DCHECK(initialized_);
   RLHandle* e;
   bool last_reference = false;
   {
@@ -365,6 +376,7 @@ void RLCacheShard<policy>::Erase(const Slice& key, uint32_t hash) {
 
 template<Cache::EvictionPolicy policy>
 size_t RLCacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
+  DCHECK(initialized_);
   size_t invalid_entry_count = 0;
   size_t valid_entry_count = 0;
   RLHandle* to_remove_head = nullptr;
@@ -409,54 +421,16 @@ size_t RLCacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
 
 }  // end anonymous namespace
 
-// Determine the number of bits of the hash that should be used to determine
-// the cache shard. This, in turn, determines the number of shards.
-int DetermineShardBits() {
-  int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
-      0 : Bits::Log2Ceiling(base::NumCPUs());
-  VLOG(1) << "Will use " << (1 << bits) << " shards for recency list cache.";
-  return bits;
-}
-
-string ToString(Cache::EvictionPolicy p) {
-  switch (p) {
-    case Cache::EvictionPolicy::FIFO:
-      return "fifo";
-    case Cache::EvictionPolicy::LRU:
-      return "lru";
-    default:
-      LOG(FATAL) << "unexpected cache eviction policy: " << static_cast<int>(p);
-  }
-  return "unknown";
-}
-
 template<>
-Cache* NewCache<Cache::EvictionPolicy::FIFO,
-                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id) {
-  return new ShardedCache<Cache::EvictionPolicy::FIFO>(capacity, id);
+CacheShard* NewCacheShardInt<Cache::EvictionPolicy::FIFO>(kudu::MemTracker* mem_tracker,
+    size_t capacity) {
+  return new RLCacheShard<Cache::EvictionPolicy::FIFO>(mem_tracker, capacity);
 }
 
 template<>
-Cache* NewCache<Cache::EvictionPolicy::LRU,
-                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id) {
-  return new ShardedCache<Cache::EvictionPolicy::LRU>(capacity, id);
-}
-
-template<Cache::EvictionPolicy policy>
-CacheShard* NewCacheShard(kudu::MemTracker* mem_tracker) {
-  return new RLCacheShard<policy>(mem_tracker);
-}
-
-std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type) {
-  switch (mem_type) {
-    case Cache::MemoryType::DRAM:
-      os << "DRAM";
-      break;
-    default:
-      os << "unknown (" << static_cast<int>(mem_type) << ")";
-      break;
-  }
-  return os;
+CacheShard* NewCacheShardInt<Cache::EvictionPolicy::LRU>(kudu::MemTracker* mem_tracker,
+    size_t capacity) {
+  return new RLCacheShard<Cache::EvictionPolicy::LRU>(mem_tracker, capacity);
 }
 
 }  // namespace impala
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 0a4f6c5..32ec350 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -80,8 +80,11 @@ be/src/kudu/security/x509_check_host.h
 be/src/kudu/security/x509_check_host.cc
 be/src/util/cache/cache.h
 be/src/util/cache/cache.cc
-be/src/util/cache/cache-test.cc
 be/src/util/cache/cache-internal.h
+be/src/util/cache/cache-test.h
+be/src/util/cache/cache-test.cc
+be/src/util/cache/rl-cache.cc
+be/src/util/cache/rl-cache-test.cc
 
 # http://www.apache.org/legal/src-headers.html: "Short informational text files; for
 # example README, INSTALL files. The expectation is that these files make it obvious which
diff --git a/tests/custom_cluster/test_data_cache.py b/tests/custom_cluster/test_data_cache.py
index 8814963..e6b5a5f 100644
--- a/tests/custom_cluster/test_data_cache.py
+++ b/tests/custom_cluster/test_data_cache.py
@@ -40,49 +40,77 @@ class TestDataCache(CustomClusterTestSuite):
       pytest.skip('runs only in exhaustive')
     super(TestDataCache, cls).setup_class()
 
+  def get_impalad_args(eviction_policy, high_write_concurrency=True,
+                       force_single_shard=True):
+    impalad_args = ["--always_use_data_cache=true"]
+    if (high_write_concurrency):
+      impalad_args.append("--data_cache_write_concurrency=64")
+    if (force_single_shard):
+      impalad_args.append("--cache_force_single_shard")
+    impalad_args.append("--data_cache_eviction_policy={0}".format(eviction_policy))
+    return " ".join(impalad_args)
+
+  CACHE_START_ARGS = "--data_cache_dir=/tmp --data_cache_size=500MB"
+
+  def __test_data_cache_deterministic(self, vector, unique_database):
+    """ This test creates a temporary table from another table, overwrites it with
+    some other data and verifies that no stale data is read from the cache. Runs with
+    a single node to make it easier to verify the runtime profile. Also enables higher
+    write concurrency and uses a single shard to avoid non-determinism.
+    """
+    self.run_test_case('QueryTest/data-cache', vector, unique_database)
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-dropped-bytes') >= 0
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') > 0
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') > 0
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') > 0
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=get_impalad_args("LRU"),
+      start_args=CACHE_START_ARGS, cluster_size=1)
+  def test_data_cache_deterministic_lru(self, vector, unique_database):
+    self.__test_data_cache_deterministic(vector, unique_database)
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args="--always_use_data_cache=true --data_cache_write_concurrency=64"
-      " --cache_force_single_shard=true",
-      start_args="--data_cache_dir=/tmp --data_cache_size=500MB", cluster_size=1)
-  def test_data_cache_deterministic(self, vector, unique_database):
-      """ This test creates a temporary table from another table, overwrites it with
-      some other data and verifies that no stale data is read from the cache. Runs with
-      a single node to make it easier to verify the runtime profile. Also enables higher
-      write concurrency and uses a single shard to avoid non-determinism.
-      """
-      self.run_test_case('QueryTest/data-cache', vector, unique_database)
-      assert self.get_metric('impala-server.io-mgr.remote-data-cache-dropped-bytes') >= 0
-      assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') > 0
-      assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') > 0
-      assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') > 0
+      impalad_args=get_impalad_args("LIRS"),
+      start_args=CACHE_START_ARGS, cluster_size=1)
+  def test_data_cache_deterministic_lirs(self, vector, unique_database):
+    self.__test_data_cache_deterministic(vector, unique_database)
+
+  def __test_data_cache(self, vector):
+    """ This test scans the same table twice and verifies the cache hit count metrics
+    are correct. The exact number of bytes hit is non-deterministic between runs due
+    to different mtime of files and multiple shards in the cache.
+    """
+    QUERY = "select * from tpch_parquet.lineitem"
+    # Do a first run to warm up the cache. Expect no hits.
+    self.execute_query(QUERY)
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') == 0
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') > 0
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') > 0
+
+    # Do a second run. Expect some hits.
+    self.execute_query(QUERY)
+    assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') > 0
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args="--always_use_data_cache=true",
-      start_args="--data_cache_dir=/tmp --data_cache_size=500MB", cluster_size=1)
-  def test_data_cache(self, vector):
-      """ This test scans the same table twice and verifies the cache hit count metrics
-      are correct. The exact number of bytes hit is non-deterministic between runs due
-      to different mtime of files and multiple shards in the cache.
-      """
-      QUERY = "select * from tpch_parquet.lineitem"
-      # Do a first run to warm up the cache. Expect no hits.
-      self.execute_query(QUERY)
-      assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') == 0
-      assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') > 0
-      assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') > 0
-
-      # Do a second run. Expect some hits.
-      self.execute_query(QUERY)
-      assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') > 0
+      impalad_args=get_impalad_args("LRU", high_write_concurrency=False,
+                                    force_single_shard=False),
+      start_args=CACHE_START_ARGS, cluster_size=1)
+  def test_data_cache_lru(self, vector):
+    self.__test_data_cache(vector)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args="--always_use_data_cache=true --data_cache_write_concurrency=64"
-      " --cache_force_single_shard=true",
-      start_args="--data_cache_dir=/tmp --data_cache_size=500MB", cluster_size=1)
-  def test_data_cache_disablement(self, vector):
+      impalad_args=get_impalad_args("LIRS", high_write_concurrency=False,
+                                    force_single_shard=False),
+      start_args=CACHE_START_ARGS, cluster_size=1)
+  def test_data_cache_lirs(self, vector):
+    self.__test_data_cache(vector)
+
+  def __test_data_cache_disablement(self, vector):
     # Verifies that the cache metrics are all zero.
     assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') == 0
     assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') == 0
@@ -99,3 +127,17 @@ class TestDataCache(CustomClusterTestSuite):
             (self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') == 0)
         assert disable_cache ==\
             (self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') == 0)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=get_impalad_args("LRU"),
+      start_args=CACHE_START_ARGS, cluster_size=1)
+  def test_data_cache_disablement_lru(self, vector):
+    self.__test_data_cache_disablement(vector)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=get_impalad_args("LIRS"),
+      start_args=CACHE_START_ARGS, cluster_size=1)
+  def test_data_cache_disablement_lirs(self, vector):
+    self.__test_data_cache_disablement(vector)