You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/02/27 17:30:14 UTC

[impala] branch master updated: IMPALA-8690 (prep 3): Factor out common code for cache implementations

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 7479be4  IMPALA-8690 (prep 3): Factor out common code for cache implementations
7479be4 is described below

commit 7479be4e18a1821768cd8a921e72405c830eca9c
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Thu Feb 6 14:48:10 2020 -0800

    IMPALA-8690 (prep 3): Factor out common code for cache implementations
    
    The existing cache implementation handles LRU/FIFO eviction algorithms,
    but it also implements several components that are useful for other
    cache implementations. Specifically, there is a simple hash table
    (HandleTable) and a simple sharding implementation (ShardedCache).
    These can be reused across other eviction algorithms by making them
    generic.
    
    This pulls them out of be/src/util/cache/cache.cc and into a new
    be/src/util/cache/cache-internal.h file. To make the HandleTable
    generic, this introduces a HandleBase class that contains common
    code between the implementations (such as the key, the value,
    the hash, etc). The HandleTable works on this base class, and the
    RLHandle now derives from HandleBase.
    
    To support this, Allocate/Free needs to treat the handle as an object
    (calling constructors/destructors) rather than treating it like
    a chunk of memory (or simple struct).
    
    ShardedCache is made generic by having cache implementations derive
    from a base CacheShard class that defines the appropriate methods
    needed by the sharding class. This is purely an interface, and the
    base class defines no functions. The existing CacheShard is renamed
    to RLCachedShard and derives from this class.
    
    Testing:
     - Release core run with a data cache enabled
     - ASAN core
     - The cache-test backend test continues to pass
    
    Change-Id: I67294244a3e8a2812f1482fe786bf7f8e6ce054e
    Reviewed-on: http://gerrit.cloudera.org:8080/15179
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/cache/cache-internal.h | 316 +++++++++++++++++++++++++++++
 be/src/util/cache/cache.cc         | 395 ++++++++++---------------------------
 bin/rat_exclude_files.txt          |   1 +
 3 files changed, 426 insertions(+), 286 deletions(-)

diff --git a/be/src/util/cache/cache-internal.h b/be/src/util/cache/cache-internal.h
new file mode 100644
index 0000000..a2bbacb
--- /dev/null
+++ b/be/src/util/cache/cache-internal.h
@@ -0,0 +1,316 @@
+// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "util/cache/cache.h"
+
+#include <vector>
+
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/alignment.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/slice.h"
+
+using kudu::Slice;
+using std::shared_ptr;
+using std::unique_ptr;
+using std::vector;
+
+namespace impala {
+
+// Base class for handles for all algorithms. Sharing the base class allows for a shared
+// HashTable implementation and a shared sharding implementation. HandleBase is designed
+// to allow the entire handle to be stored in a single allocation, even though the key
+// and value can be variable-sized. It does this by taking in a pointer to the key/value
+// pair in the constructor. For a contiguous layout, the key/value pair would be stored
+// immediately following the HandleBase (or subclass) and the caller of the constructor
+// would pass in a pointer to this contiguous data.
+class HandleBase {
+ public:
+  // Create a HandleBase. See Cache::Allocate() for information on the 'key', 'val_len',
+  // and 'charge' parameters. The remaining parameters are:
+  // - kv_ptr - points to memory for the variable sized key/value pair
+  //   It must be sized to fit the key and value with appropriate padding. See the
+  //   description of kv_data_ptr_ for the memory layout and sizing.
+  // - hash - hash of the key
+  HandleBase(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len, int charge) {
+    next_handle_ = nullptr;
+    key_length_ = key.size();
+    val_length_ = val_len;
+    hash_ = hash;
+    charge_ = charge;
+    kv_data_ptr_ = kv_ptr;
+    if (kv_data_ptr_ != nullptr && key_length_ > 0) {
+      memcpy(kv_data_ptr_, key.data(), key_length_);
+    }
+  }
+
+  Slice key() const {
+    return Slice(kv_data_ptr_, key_length_);
+  }
+
+  uint32_t hash() const { return hash_; }
+
+  uint8_t* mutable_val_ptr() {
+    int val_offset = KUDU_ALIGN_UP(key_length_, sizeof(void*));
+    return &kv_data_ptr_[val_offset];
+  }
+
+  const uint8_t* val_ptr() const {
+    return const_cast<HandleBase*>(this)->mutable_val_ptr();
+  }
+
+  Slice value() const {
+    return Slice(val_ptr(), val_length_);
+  }
+
+  size_t charge() const { return charge_; }
+
+ private:
+  friend class HandleTable;
+  HandleBase* next_handle_;
+  uint32_t hash_;
+  uint32_t key_length_;
+  uint32_t val_length_;
+  size_t charge_;
+
+  // kv_data_ptr_ is a pointer to the beginning of the key/value pair. The key/value
+  // pair is stored as a byte array with the format:
+  //   [key bytes ...] [padding up to 8-byte boundary] [value bytes ...]
+  uint8_t* kv_data_ptr_;
+};
+
+// We provide our own simple hash table since it removes a whole bunch
+// of porting hacks and is also faster than some of the built-in hash
+// table implementations in some of the compiler/runtime combinations
+// we have tested.  E.g., readrandom speeds up by ~5% over the g++
+// 4.4.3's builtin hashtable.
+class HandleTable {
+ public:
+  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
+  ~HandleTable() { delete[] list_; }
+
+  HandleBase* Lookup(const Slice& key, uint32_t hash) {
+    return *FindPointer(key, hash);
+  }
+
+  HandleBase* Insert(HandleBase* h) {
+    HandleBase** ptr = FindPointer(h->key(), h->hash());
+    HandleBase* old = *ptr;
+    h->next_handle_ = (old == nullptr ? nullptr : old->next_handle_);
+    *ptr = h;
+    if (old == nullptr) {
+      ++elems_;
+      if (elems_ > length_) {
+        // Since each cache entry is fairly large, we aim for a small
+        // average linked list length (<= 1).
+        Resize();
+      }
+    }
+    return old;
+  }
+
+  HandleBase* Remove(const Slice& key, uint32_t hash) {
+    HandleBase** ptr = FindPointer(key, hash);
+    HandleBase* result = *ptr;
+    if (result != nullptr) {
+      *ptr = result->next_handle_;
+      --elems_;
+    }
+    return result;
+  }
+
+ private:
+  // The table consists of an array of buckets where each bucket is
+  // a linked list of cache entries that hash into the bucket.
+  uint32_t length_;
+  uint32_t elems_;
+  HandleBase** list_;
+
+  // Return a pointer to slot that points to a cache entry that
+  // matches key/hash.  If there is no such cache entry, return a
+  // pointer to the trailing slot in the corresponding linked list.
+  HandleBase** FindPointer(const Slice& key, uint32_t hash) {
+    HandleBase** ptr = &list_[hash & (length_ - 1)];
+    while (*ptr != nullptr &&
+           ((*ptr)->hash() != hash || key != (*ptr)->key())) {
+      ptr = &(*ptr)->next_handle_;
+    }
+    return ptr;
+  }
+
+  void Resize() {
+    uint32_t new_length = 16;
+    while (new_length < elems_ * 1.5) {
+      new_length *= 2;
+    }
+    auto new_list = new HandleBase*[new_length];
+    memset(new_list, 0, sizeof(new_list[0]) * new_length);
+    uint32_t count = 0;
+    for (uint32_t i = 0; i < length_; i++) {
+      HandleBase* h = list_[i];
+      while (h != nullptr) {
+        HandleBase* next = h->next_handle_;
+        uint32_t hash = h->hash();
+        HandleBase** ptr = &new_list[hash & (new_length - 1)];
+        h->next_handle_ = *ptr;
+        *ptr = h;
+        h = next;
+        count++;
+      }
+    }
+    DCHECK_EQ(elems_, count);
+    delete[] list_;
+    list_ = new_list;
+    length_ = new_length;
+  }
+};
+
+// This defines a single shared interface for the cache algorithms, which allows them to
+// share the sharding implementation. There are several differences between this interface
+// and the Cache interface:
+// 1. The functions use HandleBase arguments rather than the types defined externally
+//    for the Cache.
+// 2. This does not use separate Handle vs PendingHandle types. Allocate and Free just use
+//    HandleBase like all other functions. At the moment, no implementation uses this
+//    functionality. This is internal code, so the distinction can be reintroduced later
+//    if it is useful.
+// 3. Since the sharding would already need the hash, it passes the computed hash through
+//    for all functions where it makes sense.
+class CacheShard {
+ public:
+  virtual ~CacheShard() {}
+
+  // Separate from constructor so caller can easily make an array of CacheShard
+  virtual void SetCapacity(size_t capacity) = 0;
+
+  virtual HandleBase* Allocate(Slice key, uint32_t hash, int val_len, int charge) = 0;
+  virtual void Free(HandleBase* handle) = 0;
+  virtual HandleBase* Insert(HandleBase* handle,
+      Cache::EvictionCallback* eviction_callback) = 0;
+  virtual HandleBase* Lookup(const Slice& key, uint32_t hash, bool caching) = 0;
+  virtual void Release(HandleBase* handle) = 0;
+  virtual void Erase(const Slice& key, uint32_t hash) = 0;
+  virtual size_t Invalidate(const Cache::InvalidationControl& ctl) = 0;
+};
+
+// Function to build a cache shard using the given eviction algorithm.
+template <Cache::EvictionPolicy eviction_policy>
+CacheShard* NewCacheShard(kudu::MemTracker* mem_tracker);
+
+// Determine the number of bits of the hash that should be used to determine
+// the cache shard. This, in turn, determines the number of shards.
+int DetermineShardBits();
+
+// Helper function to provide a string representation of an eviction policy.
+string ToString(Cache::EvictionPolicy p);
+
+// This is a minimal sharding cache implementation. It passes almost all functions
+// through to the underlying CacheShard unless the function can be answered by the
+// HandleBase itself. The number of shards is currently a power of 2 so that it can
+// do a right shift to get the shard index.
+template <Cache::EvictionPolicy policy>
+class ShardedCache : public Cache {
+ public:
+  explicit ShardedCache(size_t capacity, const string& id)
+      : shard_bits_(DetermineShardBits()) {
+    // A cache is often a singleton, so:
+    // 1. We reuse its MemTracker if one already exists, and
+    // 2. It is directly parented to the root MemTracker.
+    // TODO: Change this to an Impala MemTracker, and only track the memory usage for the
+    // metadata. This currently does not hook up to Impala's MemTracker hierarchy
+    mem_tracker_ = kudu::MemTracker::FindOrCreateGlobalTracker(
+        -1, strings::Substitute("$0-sharded_$1_cache", id, ToString(policy)));
+
+    int num_shards = 1 << shard_bits_;
+    const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
+    for (int s = 0; s < num_shards; s++) {
+      unique_ptr<CacheShard> shard(NewCacheShard<policy>(mem_tracker_.get()));
+      shard->SetCapacity(per_shard);
+      shards_.push_back(shard.release());
+    }
+  }
+
+  virtual ~ShardedCache() {
+    STLDeleteElements(&shards_);
+  }
+
+  UniqueHandle Lookup(const Slice& key, CacheBehavior caching) override {
+    const uint32_t hash = HashSlice(key);
+    HandleBase* h = shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE);
+    return UniqueHandle(reinterpret_cast<Cache::Handle*>(h), Cache::HandleDeleter(this));
+  }
+
+  void Erase(const Slice& key) override {
+    const uint32_t hash = HashSlice(key);
+    shards_[Shard(hash)]->Erase(key, hash);
+  }
+
+  Slice Value(const UniqueHandle& handle) const override {
+    return reinterpret_cast<HandleBase*>(handle.get())->value();
+  }
+
+  UniqueHandle Insert(UniquePendingHandle handle,
+      Cache::EvictionCallback* eviction_callback) override {
+    HandleBase* h_in = reinterpret_cast<HandleBase*>(DCHECK_NOTNULL(handle.release()));
+    HandleBase* h_out = shards_[Shard(h_in->hash())]->Insert(h_in, eviction_callback);
+    return UniqueHandle(reinterpret_cast<Cache::Handle*>(h_out),
+        Cache::HandleDeleter(this));
+  }
+
+  UniquePendingHandle Allocate(Slice key, int val_len, int charge) override {
+    const uint32_t hash = HashSlice(key);
+    HandleBase* handle = shards_[Shard(hash)]->Allocate(key, hash, val_len, charge);
+    UniquePendingHandle h(reinterpret_cast<PendingHandle*>(handle),
+                          PendingHandleDeleter(this));
+
+    return h;
+  }
+
+  uint8_t* MutableValue(UniquePendingHandle* handle) override {
+    return reinterpret_cast<HandleBase*>(handle->get())->mutable_val_ptr();
+  }
+
+  size_t Invalidate(const InvalidationControl& ctl) override {
+    size_t invalidated_count = 0;
+    for (auto& shard: shards_) {
+      invalidated_count += shard->Invalidate(ctl);
+    }
+    return invalidated_count;
+  }
+
+ protected:
+  void Release(Handle* handle) override {
+    HandleBase* h = reinterpret_cast<HandleBase*>(handle);
+    shards_[Shard(h->hash())]->Release(h);
+  }
+
+  void Free(PendingHandle* pending_handle) override {
+    HandleBase* h = reinterpret_cast<HandleBase*>(pending_handle);
+    shards_[Shard(h->hash())]->Free(h);
+  }
+
+ private:
+  shared_ptr<kudu::MemTracker> mem_tracker_;
+  vector<CacheShard*> shards_;
+
+  // Number of bits of hash used to determine the shard.
+  const int shard_bits_;
+
+  static inline uint32_t HashSlice(const Slice& s) {
+    return util_hash::CityHash64(reinterpret_cast<const char *>(s.data()), s.size());
+  }
+
+  uint32_t Shard(uint32_t hash) {
+    // Widen to uint64 before shifting, or else on a single CPU,
+    // we would try to shift a uint32_t by 32 bits, which is undefined.
+    return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
+  }
+};
+
+}
diff --git a/be/src/util/cache/cache.cc b/be/src/util/cache/cache.cc
index 75e6ff6..c9f97a7 100644
--- a/be/src/util/cache/cache.cc
+++ b/be/src/util/cache/cache.cc
@@ -31,6 +31,7 @@
 #include "kudu/util/malloc.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/slice.h"
+#include "util/cache/cache-internal.h"
 
 // Useful in tests that require accurate cache capacity accounting.
 DECLARE_bool(cache_force_single_shard);
@@ -65,156 +66,45 @@ namespace {
 // Recency list handle. An entry is a variable length heap-allocated structure.
 // Entries are kept in a circular doubly linked list ordered by some recency
 // criterion (e.g., access time for LRU policy, insertion time for FIFO policy).
-struct RLHandle {
+class RLHandle : public HandleBase {
+public:
+  RLHandle(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len, int charge)
+    : HandleBase(kv_ptr, key, hash, val_len, charge) {}
+
+  RLHandle()
+    : HandleBase(nullptr, Slice(), 0, 0, 0) {}
+
   Cache::EvictionCallback* eviction_callback;
-  RLHandle* next_hash;
   RLHandle* next;
   RLHandle* prev;
-  size_t charge;      // TODO(opt): Only allow uint32_t?
-  uint32_t key_length;
-  uint32_t val_length;
   std::atomic<int32_t> refs;
-  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
-
-  // The storage for the key/value pair itself. The data is stored as:
-  //   [key bytes ...] [padding up to 8-byte boundary] [value bytes ...]
-  uint8_t kv_data[1];   // Beginning of key/value pair
-
-  Slice key() const {
-    return Slice(kv_data, key_length);
-  }
-
-  uint8_t* mutable_val_ptr() {
-    int val_offset = KUDU_ALIGN_UP(key_length, sizeof(void*));
-    return &kv_data[val_offset];
-  }
-
-  const uint8_t* val_ptr() const {
-    return const_cast<RLHandle*>(this)->mutable_val_ptr();
-  }
-
-  Slice value() const {
-    return Slice(val_ptr(), val_length);
-  }
-};
-
-// We provide our own simple hash table since it removes a whole bunch
-// of porting hacks and is also faster than some of the built-in hash
-// table implementations in some of the compiler/runtime combinations
-// we have tested.  E.g., readrandom speeds up by ~5% over the g++
-// 4.4.3's builtin hashtable.
-class HandleTable {
- public:
-  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
-  ~HandleTable() { delete[] list_; }
-
-  RLHandle* Lookup(const Slice& key, uint32_t hash) {
-    return *FindPointer(key, hash);
-  }
-
-  RLHandle* Insert(RLHandle* h) {
-    RLHandle** ptr = FindPointer(h->key(), h->hash);
-    RLHandle* old = *ptr;
-    h->next_hash = (old == nullptr ? nullptr : old->next_hash);
-    *ptr = h;
-    if (old == nullptr) {
-      ++elems_;
-      if (elems_ > length_) {
-        // Since each cache entry is fairly large, we aim for a small
-        // average linked list length (<= 1).
-        Resize();
-      }
-    }
-    return old;
-  }
-
-  RLHandle* Remove(const Slice& key, uint32_t hash) {
-    RLHandle** ptr = FindPointer(key, hash);
-    RLHandle* result = *ptr;
-    if (result != nullptr) {
-      *ptr = result->next_hash;
-      --elems_;
-    }
-    return result;
-  }
-
- private:
-  // The table consists of an array of buckets where each bucket is
-  // a linked list of cache entries that hash into the bucket.
-  uint32_t length_;
-  uint32_t elems_;
-  RLHandle** list_;
-
-  // Return a pointer to slot that points to a cache entry that
-  // matches key/hash.  If there is no such cache entry, return a
-  // pointer to the trailing slot in the corresponding linked list.
-  RLHandle** FindPointer(const Slice& key, uint32_t hash) {
-    RLHandle** ptr = &list_[hash & (length_ - 1)];
-    while (*ptr != nullptr &&
-           ((*ptr)->hash != hash || key != (*ptr)->key())) {
-      ptr = &(*ptr)->next_hash;
-    }
-    return ptr;
-  }
-
-  void Resize() {
-    uint32_t new_length = 16;
-    while (new_length < elems_ * 1.5) {
-      new_length *= 2;
-    }
-    auto new_list = new RLHandle*[new_length];
-    memset(new_list, 0, sizeof(new_list[0]) * new_length);
-    uint32_t count = 0;
-    for (uint32_t i = 0; i < length_; i++) {
-      RLHandle* h = list_[i];
-      while (h != nullptr) {
-        RLHandle* next = h->next_hash;
-        uint32_t hash = h->hash;
-        RLHandle** ptr = &new_list[hash & (new_length - 1)];
-        h->next_hash = *ptr;
-        *ptr = h;
-        h = next;
-        count++;
-      }
-    }
-    DCHECK_EQ(elems_, count);
-    delete[] list_;
-    list_ = new_list;
-    length_ = new_length;
-  }
 };
 
-string ToString(Cache::EvictionPolicy p) {
-  switch (p) {
-    case Cache::EvictionPolicy::FIFO:
-      return "fifo";
-    case Cache::EvictionPolicy::LRU:
-      return "lru";
-    default:
-      LOG(FATAL) << "unexpected cache eviction policy: " << static_cast<int>(p);
-  }
-  return "unknown";
-}
-
-// A single shard of sharded cache.
+// A single shard of a cache that uses a recency list based eviction policy
 template<Cache::EvictionPolicy policy>
-class CacheShard {
+class RLCacheShard : public CacheShard {
+  static_assert(
+      policy == Cache::EvictionPolicy::LRU || policy == Cache::EvictionPolicy::FIFO,
+      "RLCacheShard only supports LRU or FIFO");
+
  public:
-  explicit CacheShard(kudu::MemTracker* tracker);
-  ~CacheShard();
+  explicit RLCacheShard(kudu::MemTracker* tracker);
+  ~RLCacheShard();
 
   // Separate from constructor so caller can easily make an array of CacheShard
-  void SetCapacity(size_t capacity) {
+  void SetCapacity(size_t capacity) override {
     capacity_ = capacity;
     max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio;
   }
 
-  Cache::Handle* Insert(RLHandle* handle, Cache::EvictionCallback* eviction_callback);
-  // Like Cache::Lookup, but with an extra "hash" parameter.
-  Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
-  void Release(Cache::Handle* handle);
-  void Erase(const Slice& key, uint32_t hash);
-  size_t Invalidate(const Cache::InvalidationControl& ctl);
+  HandleBase* Allocate(Slice key, uint32_t hash, int val_len, int charge) override;
+  void Free(HandleBase* handle) override;
+  HandleBase* Insert(HandleBase* handle,
+      Cache::EvictionCallback* eviction_callback) override;
+  HandleBase* Lookup(const Slice& key, uint32_t hash, bool caching) override;
+  void Release(HandleBase* handle) override;
+  void Erase(const Slice& key, uint32_t hash) override;
+  size_t Invalidate(const Cache::InvalidationControl& ctl) override;
 
  private:
   void RL_Remove(RLHandle* e);
@@ -270,16 +160,16 @@ class CacheShard {
 };
 
 template<Cache::EvictionPolicy policy>
-CacheShard<policy>::CacheShard(kudu::MemTracker* tracker)
-    : usage_(0),
-      mem_tracker_(tracker) {
+RLCacheShard<policy>::RLCacheShard(kudu::MemTracker* tracker)
+  : usage_(0),
+    mem_tracker_(tracker) {
   // Make empty circular linked list.
   rl_.next = &rl_;
   rl_.prev = &rl_;
 }
 
 template<Cache::EvictionPolicy policy>
-CacheShard<policy>::~CacheShard() {
+RLCacheShard<policy>::~RLCacheShard() {
   for (RLHandle* e = rl_.next; e != &rl_; ) {
     RLHandle* next = e->next;
     DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1)
@@ -293,23 +183,23 @@ CacheShard<policy>::~CacheShard() {
 }
 
 template<Cache::EvictionPolicy policy>
-bool CacheShard<policy>::Unref(RLHandle* e) {
+bool RLCacheShard<policy>::Unref(RLHandle* e) {
   DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0);
   return e->refs.fetch_sub(1) == 1;
 }
 
 template<Cache::EvictionPolicy policy>
-void CacheShard<policy>::FreeEntry(RLHandle* e) {
+void RLCacheShard<policy>::FreeEntry(RLHandle* e) {
   DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0);
   if (e->eviction_callback) {
     e->eviction_callback->EvictedEntry(e->key(), e->value());
   }
-  UpdateMemTracker(-static_cast<int64_t>(e->charge));
-  delete [] e;
+  UpdateMemTracker(-static_cast<int64_t>(e->charge()));
+  Free(e);
 }
 
 template<Cache::EvictionPolicy policy>
-void CacheShard<policy>::UpdateMemTracker(int64_t delta) {
+void RLCacheShard<policy>::UpdateMemTracker(int64_t delta) {
   int64_t old_deferred = deferred_consumption_.fetch_add(delta);
   int64_t new_deferred = old_deferred + delta;
 
@@ -321,53 +211,84 @@ void CacheShard<policy>::UpdateMemTracker(int64_t delta) {
 }
 
 template<Cache::EvictionPolicy policy>
-void CacheShard<policy>::RL_Remove(RLHandle* e) {
+void RLCacheShard<policy>::RL_Remove(RLHandle* e) {
   e->next->prev = e->prev;
   e->prev->next = e->next;
-  DCHECK_GE(usage_, e->charge);
-  usage_ -= e->charge;
+  DCHECK_GE(usage_, e->charge());
+  usage_ -= e->charge();
 }
 
 template<Cache::EvictionPolicy policy>
-void CacheShard<policy>::RL_Append(RLHandle* e) {
+void RLCacheShard<policy>::RL_Append(RLHandle* e) {
   // Make "e" newest entry by inserting just before rl_.
   e->next = &rl_;
   e->prev = rl_.prev;
   e->prev->next = e;
   e->next->prev = e;
-  usage_ += e->charge;
+  usage_ += e->charge();
 }
 
 template<>
-void CacheShard<Cache::EvictionPolicy::FIFO>::RL_UpdateAfterLookup(RLHandle* /* e */) {
+void RLCacheShard<Cache::EvictionPolicy::FIFO>::RL_UpdateAfterLookup(RLHandle* /* e */) {
 }
 
 template<>
-void CacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e) {
+void RLCacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e) {
   RL_Remove(e);
   RL_Append(e);
 }
 
 template<Cache::EvictionPolicy policy>
-Cache::Handle* CacheShard<policy>::Lookup(const Slice& key,
-                                          uint32_t hash,
-                                          bool caching) {
+HandleBase* RLCacheShard<policy>::Allocate(Slice key, uint32_t hash, int val_len,
+    int charge) {
+  int key_len = key.size();
+  DCHECK_GE(key_len, 0);
+  DCHECK_GE(val_len, 0);
+  int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
+  uint8_t* buf = new uint8_t[sizeof(RLHandle)
+                             + key_len_padded + val_len]; // the kv_data VLA data
+  // TODO(KUDU-1091): account for the footprint of structures used by Cache's
+  //                  internal housekeeping (RL handles, etc.) in case of
+  //                  non-automatic charge.
+  int calc_charge =
+    (charge == Cache::kAutomaticCharge) ? kudu::kudu_malloc_usable_size(buf) : charge;
+  uint8_t* kv_ptr = buf + sizeof(RLHandle);
+  RLHandle* handle = new (buf) RLHandle(kv_ptr, key, hash, val_len,
+      calc_charge);
+  return handle;
+}
+
+template<Cache::EvictionPolicy policy>
+void RLCacheShard<policy>::Free(HandleBase* handle) {
+  // We allocate the handle as a uint8_t array, then we call a placement new,
+  // which calls the constructor. For symmetry, we call the destructor and then
+  // delete on the uint8_t array.
+  RLHandle* h = static_cast<RLHandle*>(handle);
+  h->~RLHandle();
+  uint8_t* data = reinterpret_cast<uint8_t*>(handle);
+  delete [] data;
+}
+
+template<Cache::EvictionPolicy policy>
+HandleBase* RLCacheShard<policy>::Lookup(const Slice& key,
+                                         uint32_t hash,
+                                         bool caching) {
   RLHandle* e;
   {
     std::lock_guard<decltype(mutex_)> l(mutex_);
-    e = table_.Lookup(key, hash);
+    e = static_cast<RLHandle*>(table_.Lookup(key, hash));
     if (e != nullptr) {
       e->refs.fetch_add(1, std::memory_order_relaxed);
       RL_UpdateAfterLookup(e);
     }
   }
 
-  return reinterpret_cast<Cache::Handle*>(e);
+  return e;
 }
 
 template<Cache::EvictionPolicy policy>
-void CacheShard<policy>::Release(Cache::Handle* handle) {
-  RLHandle* e = reinterpret_cast<RLHandle*>(handle);
+void RLCacheShard<policy>::Release(HandleBase* handle) {
+  RLHandle* e = static_cast<RLHandle*>(handle);
   bool last_reference = Unref(e);
   if (last_reference) {
     FreeEntry(e);
@@ -375,15 +296,16 @@ void CacheShard<policy>::Release(Cache::Handle* handle) {
 }
 
 template<Cache::EvictionPolicy policy>
-Cache::Handle* CacheShard<policy>::Insert(
-    RLHandle* handle,
+HandleBase* RLCacheShard<policy>::Insert(
+    HandleBase* handle_in,
     Cache::EvictionCallback* eviction_callback) {
+  RLHandle* handle = static_cast<RLHandle*>(handle_in);
   // Set the remaining RLHandle members which were not already allocated during
   // Allocate().
   handle->eviction_callback = eviction_callback;
-  // Two refs for the handle: one from CacheShard, one for the returned handle.
+  // Two refs for the handle: one from RLCacheShard, one for the returned handle.
   handle->refs.store(2, std::memory_order_relaxed);
-  UpdateMemTracker(handle->charge);
+  UpdateMemTracker(handle->charge());
 
   RLHandle* to_remove_head = nullptr;
   {
@@ -391,7 +313,7 @@ Cache::Handle* CacheShard<policy>::Insert(
 
     RL_Append(handle);
 
-    RLHandle* old = table_.Insert(handle);
+    RLHandle* old = static_cast<RLHandle*>(table_.Insert(handle));
     if (old != nullptr) {
       RL_Remove(old);
       if (Unref(old)) {
@@ -403,7 +325,7 @@ Cache::Handle* CacheShard<policy>::Insert(
     while (usage_ > capacity_ && rl_.next != &rl_) {
       RLHandle* old = rl_.next;
       RL_Remove(old);
-      table_.Remove(old->key(), old->hash);
+      table_.Remove(old->key(), old->hash());
       if (Unref(old)) {
         old->next = to_remove_head;
         to_remove_head = old;
@@ -419,16 +341,16 @@ Cache::Handle* CacheShard<policy>::Insert(
     to_remove_head = next;
   }
 
-  return reinterpret_cast<Cache::Handle*>(handle);
+  return handle;
 }
 
 template<Cache::EvictionPolicy policy>
-void CacheShard<policy>::Erase(const Slice& key, uint32_t hash) {
+void RLCacheShard<policy>::Erase(const Slice& key, uint32_t hash) {
   RLHandle* e;
   bool last_reference = false;
   {
     std::lock_guard<decltype(mutex_)> l(mutex_);
-    e = table_.Remove(key, hash);
+    e = static_cast<RLHandle*>(table_.Remove(key, hash));
     if (e != nullptr) {
       RL_Remove(e);
       last_reference = Unref(e);
@@ -442,7 +364,7 @@ void CacheShard<policy>::Erase(const Slice& key, uint32_t hash) {
 }
 
 template<Cache::EvictionPolicy policy>
-size_t CacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
+size_t RLCacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
   size_t invalid_entry_count = 0;
   size_t valid_entry_count = 0;
   RLHandle* to_remove_head = nullptr;
@@ -466,7 +388,7 @@ size_t CacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
       h = h->next;
 
       RL_Remove(h_to_remove);
-      table_.Remove(h_to_remove->key(), h_to_remove->hash);
+      table_.Remove(h_to_remove->key(), h_to_remove->hash());
       if (Unref(h_to_remove)) {
         h_to_remove->next = to_remove_head;
         to_remove_head = h_to_remove;
@@ -485,6 +407,8 @@ size_t CacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
   return invalid_entry_count;
 }
 
+}  // end anonymous namespace
+
 // Determine the number of bits of the hash that should be used to determine
 // the cache shard. This, in turn, determines the number of shards.
 int DetermineShardBits() {
@@ -494,123 +418,17 @@ int DetermineShardBits() {
   return bits;
 }
 
-template<Cache::EvictionPolicy policy>
-class ShardedCache : public Cache {
- public:
-  explicit ShardedCache(size_t capacity, const string& id)
-        : shard_bits_(DetermineShardBits()) {
-    // A cache is often a singleton, so:
-    // 1. We reuse its MemTracker if one already exists, and
-    // 2. It is directly parented to the root MemTracker.
-    mem_tracker_ = kudu::MemTracker::FindOrCreateGlobalTracker(
-        -1, strings::Substitute("$0-sharded_$1_cache", id, ToString(policy)));
-
-    int num_shards = 1 << shard_bits_;
-    const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
-    for (int s = 0; s < num_shards; s++) {
-      unique_ptr<CacheShard<policy>> shard(
-          new CacheShard<policy>(mem_tracker_.get()));
-      shard->SetCapacity(per_shard);
-      shards_.push_back(shard.release());
-    }
-  }
-
-  virtual ~ShardedCache() {
-    STLDeleteElements(&shards_);
-  }
-
-  UniqueHandle Lookup(const Slice& key, CacheBehavior caching) override {
-    const uint32_t hash = HashSlice(key);
-    return UniqueHandle(
-        shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE),
-        Cache::HandleDeleter(this));
-  }
-
-  void Erase(const Slice& key) override {
-    const uint32_t hash = HashSlice(key);
-    shards_[Shard(hash)]->Erase(key, hash);
-  }
-
-  Slice Value(const UniqueHandle& handle) const override {
-    return reinterpret_cast<const RLHandle*>(handle.get())->value();
-  }
-
-  UniqueHandle Insert(UniquePendingHandle handle,
-                      Cache::EvictionCallback* eviction_callback) override {
-    RLHandle* h = reinterpret_cast<RLHandle*>(DCHECK_NOTNULL(handle.release()));
-    return UniqueHandle(
-        shards_[Shard(h->hash)]->Insert(h, eviction_callback),
-        Cache::HandleDeleter(this));
-  }
-
-  UniquePendingHandle Allocate(Slice key, int val_len, int charge) override {
-    int key_len = key.size();
-    DCHECK_GE(key_len, 0);
-    DCHECK_GE(val_len, 0);
-    int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
-    UniquePendingHandle h(reinterpret_cast<PendingHandle*>(
-        new uint8_t[sizeof(RLHandle)
-                    + key_len_padded + val_len // the kv_data VLA data
-                    - 1 // (the VLA has a 1-byte placeholder)
-                   ]),
-        PendingHandleDeleter(this));
-    RLHandle* handle = reinterpret_cast<RLHandle*>(h.get());
-    handle->key_length = key_len;
-    handle->val_length = val_len;
-    // TODO(KUDU-1091): account for the footprint of structures used by Cache's
-    //                  internal housekeeping (RL handles, etc.) in case of
-    //                  non-automatic charge.
-    handle->charge = (charge == kAutomaticCharge) ? kudu::kudu_malloc_usable_size(h.get())
-                                                  : charge;
-    handle->hash = HashSlice(key);
-    memcpy(handle->kv_data, key.data(), key_len);
-
-    return h;
-  }
-
-  uint8_t* MutableValue(UniquePendingHandle* handle) override {
-    return reinterpret_cast<RLHandle*>(handle->get())->mutable_val_ptr();
-  }
-
-  size_t Invalidate(const InvalidationControl& ctl) override {
-    size_t invalidated_count = 0;
-    for (auto& shard: shards_) {
-      invalidated_count += shard->Invalidate(ctl);
-    }
-    return invalidated_count;
-  }
-
- protected:
-  void Release(Handle* handle) override {
-    RLHandle* h = reinterpret_cast<RLHandle*>(handle);
-    shards_[Shard(h->hash)]->Release(handle);
-  }
-
-  void Free(PendingHandle* h) override {
-    uint8_t* data = reinterpret_cast<uint8_t*>(h);
-    delete [] data;
-  }
-
- private:
-  static inline uint32_t HashSlice(const Slice& s) {
-    return util_hash::CityHash64(
-      reinterpret_cast<const char *>(s.data()), s.size());
-  }
-
-  uint32_t Shard(uint32_t hash) {
-    // Widen to uint64 before shifting, or else on a single CPU,
-    // we would try to shift a uint32_t by 32 bits, which is undefined.
-    return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
+string ToString(Cache::EvictionPolicy p) {
+  switch (p) {
+    case Cache::EvictionPolicy::FIFO:
+      return "fifo";
+    case Cache::EvictionPolicy::LRU:
+      return "lru";
+    default:
+      LOG(FATAL) << "unexpected cache eviction policy: " << static_cast<int>(p);
   }
-
-  shared_ptr<kudu::MemTracker> mem_tracker_;
-  vector<CacheShard<policy>*> shards_;
-
-  // Number of bits of hash used to determine the shard.
-  const int shard_bits_;
-};
-
-}  // end anonymous namespace
+  return "unknown";
+}
 
 template<>
 Cache* NewCache<Cache::EvictionPolicy::FIFO,
@@ -624,6 +442,11 @@ Cache* NewCache<Cache::EvictionPolicy::LRU,
   return new ShardedCache<Cache::EvictionPolicy::LRU>(capacity, id);
 }
 
+template<Cache::EvictionPolicy policy>
+CacheShard* NewCacheShard(kudu::MemTracker* mem_tracker) {
+  return new RLCacheShard<policy>(mem_tracker);
+}
+
 std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type) {
   switch (mem_type) {
     case Cache::MemoryType::DRAM:
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 06a53fb..57a5164 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -79,6 +79,7 @@ be/src/kudu/security/x509_check_host.cc
 be/src/util/cache/cache.h
 be/src/util/cache/cache.cc
 be/src/util/cache/cache-test.cc
+be/src/util/cache/cache-internal.h
 
 # http://www.apache.org/legal/src-headers.html: "Short informational text files; for
 # example README, INSTALL files. The expectation is that these files make it obvious which