You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/03/04 18:02:33 UTC

[impala] 02/02: IMPALA-9433: Improved caching of HdfsFileHandles

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7a28295905e97e41503670bbdd92e64ea47f0078
Author: Gergely Fürnstáhl <gf...@cloudera.com>
AuthorDate: Wed Feb 2 15:23:30 2022 +0100

    IMPALA-9433: Improved caching of HdfsFileHandles
    
    Seperated LRU caching functionality to a templated LruMultiCache class.
    
    Replaced std::multimap with std::unordered_map with std::list for O(1)
    lookups and less memory overhead, as it stores each key one time. Added
    boost::intrusive::list to handle LRU relations with less overhead.
    Added O(1) release method, instead of O(n) with minimal memory overhead.
    Implemented RAII Accessor to remove the responsibility of releasing
    the objects from the user.
    
    Wrapped cache accessor and related DiskIOManager metrics to a
    FileHandleCache::Accessor. Removed Release*() call trees from
    FileHandleCache and DiskIOManager, removed scoped exit from
    HdfsFileReader as they are handled automatically.
    
    Testing:
    
    Implemented extensive unit testing of the class, including forced
    rehashes, collisions, capacity overshoot, explicit/automatic release
    and destroy.
    
    Ran tests/custom_cluster/test_hdfs_fd_caching.py to verify
    FileHandleCache::Accessor behaviour through metrics.
    
    Ran bin/single_node_perf_run.py with TPCH and TPC-DS on parquet tables,
    no visible change in performance:
    TPCH   scale=10 iterations=100: Delta(Avg)=-0.67% Delta(GeoMean)=-0.49%
    TPC-DS scale=10 iterations= 50: Delta(Avg)=-0.02% Delta(GeoMean)= 0.00%
    
    Tested some manual queries on functional_parquet.widetable_1000_cols
    with 64 threads but did not notice significant changes in scan times.
    
    Change-Id: I6b5c5e9e2b5db2847ab88c41f667c9ca1b03d51a
    Reviewed-on: http://gerrit.cloudera.org:8080/18191
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/disk-io-mgr.cc        |  29 +-
 be/src/runtime/io/disk-io-mgr.h         |  21 +-
 be/src/runtime/io/handle-cache.h        | 142 ++++------
 be/src/runtime/io/handle-cache.inline.h | 235 +++++++---------
 be/src/runtime/io/hdfs-file-reader.cc   |  25 +-
 be/src/util/CMakeLists.txt              |   2 +
 be/src/util/lru-multi-cache-test.cc     | 459 ++++++++++++++++++++++++++++++++
 be/src/util/lru-multi-cache.h           | 240 +++++++++++++++++
 be/src/util/lru-multi-cache.inline.h    | 288 ++++++++++++++++++++
 9 files changed, 1171 insertions(+), 270 deletions(-)

diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 22bead3..e80b0fb 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -873,14 +873,12 @@ void DiskIoMgr::ReleaseExclusiveHdfsFileHandle(unique_ptr<ExclusiveHdfsFileHandl
   fid.reset();
 }
 
-Status DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs,
-    std::string* fname, int64_t mtime, RequestContext *reader,
-    CachedHdfsFileHandle** handle_out) {
+Status DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname,
+    int64_t mtime, RequestContext* reader, FileHandleCache::Accessor* accessor) {
   bool cache_hit;
   SCOPED_TIMER(reader->open_file_timer_);
-  RETURN_IF_ERROR(file_handle_cache_.GetFileHandle(fs, fname, mtime, false, handle_out,
-      &cache_hit));
-  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
+  RETURN_IF_ERROR(
+      file_handle_cache_.GetFileHandle(fs, fname, mtime, false, accessor, &cache_hit));
   if (cache_hit) {
     ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(1L);
     ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT->Increment(1L);
@@ -893,24 +891,17 @@ Status DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs,
   return Status::OK();
 }
 
-void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname,
-    CachedHdfsFileHandle* fid) {
-  file_handle_cache_.ReleaseFileHandle(fname, fid, false);
-  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
-}
-
 Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname,
-    int64_t mtime, RequestContext* reader, CachedHdfsFileHandle** fid) {
+    int64_t mtime, RequestContext* reader, FileHandleCache::Accessor* accessor) {
   bool cache_hit;
   SCOPED_TIMER(reader->open_file_timer_);
   ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_REOPENED->Increment(1L);
-  file_handle_cache_.ReleaseFileHandle(fname, *fid, true);
-  // The old handle has been destroyed, so *fid must be overwritten before returning.
-  *fid = nullptr;
-  Status status = file_handle_cache_.GetFileHandle(fs, fname, mtime, true, fid,
-      &cache_hit);
+
+  accessor->Destroy();
+
+  Status status =
+      file_handle_cache_.GetFileHandle(fs, fname, mtime, true, accessor, &cache_hit);
   if (!status.ok()) {
-    ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
     return status;
   }
   DCHECK(!cache_hit);
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index b22a67f..b586908 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -361,22 +361,19 @@ class DiskIoMgr : public CacheLineAligned {
   void ReleaseExclusiveHdfsFileHandle(std::unique_ptr<ExclusiveHdfsFileHandle> fid);
 
   /// Given a FS handle, name and last modified time of the file, gets a
-  /// CachedHdfsFileHandle from the file handle cache and returns it via 'fid'.
-  /// Records the time spent opening the handle in 'reader'. On success, records
-  /// statistics about whether this was a cache hit or miss in the 'reader' as well as
-  /// at the system level. In case of an error, returns status and 'fid' is untouched.
-  Status GetCachedHdfsFileHandle(const hdfsFS& fs,
-      std::string* fname, int64_t mtime, RequestContext* reader,
-      CachedHdfsFileHandle** fid) WARN_UNUSED_RESULT;
-
-  /// Releases a file handle back to the file handle cache when it is no longer in use.
-  void ReleaseCachedHdfsFileHandle(std::string* fname, CachedHdfsFileHandle* fid);
+  /// CachedHdfsFileHandle accessor from the file handle cache and returns it via
+  /// 'accessor'. Records the time spent opening the handle in 'reader'. On success,
+  /// records statistics about whether this was a cache hit or miss in the 'reader' as
+  /// well as at the system level. In case of an error, returns status and 'accessor' is
+  /// untouched.
+  Status GetCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
+      RequestContext* reader, FileHandleCache::Accessor* accessor) WARN_UNUSED_RESULT;
 
   /// Reopens a file handle by destroying the file handle and getting a fresh
-  /// file handle from the cache. Records the time spent reopening the handle
+  /// file handle accessor from the cache. Records the time spent reopening the handle
   /// in 'reader'. Returns an error if the file could not be reopened.
   Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
-      RequestContext* reader, CachedHdfsFileHandle** fid) WARN_UNUSED_RESULT;
+      RequestContext* reader, FileHandleCache::Accessor* accessor) WARN_UNUSED_RESULT;
 
   // Function to change the underlying LocalFileSystem object used for disk I/O.
   // DiskIoMgr will also take responsibility of the received LocalFileSystem pointer.
diff --git a/be/src/runtime/io/handle-cache.h b/be/src/runtime/io/handle-cache.h
index ef751eb..fb7fb8e 100644
--- a/be/src/runtime/io/handle-cache.h
+++ b/be/src/runtime/io/handle-cache.h
@@ -21,12 +21,11 @@
 #include <list>
 #include <map>
 #include <memory>
-#include <mutex>
 
 #include "common/hdfs.h"
 #include "common/status.h"
 #include "util/aligned-new.h"
-#include "util/spinlock.h"
+#include "util/lru-multi-cache.h"
 #include "util/thread.h"
 
 namespace impala {
@@ -86,18 +85,18 @@ class ExclusiveHdfsFileHandle : public HdfsFileHandle {
 /// between concurrent threads. The `capacity` is split between the partitions and is
 /// enforced independently.
 ///
-/// Threads check out a file handle for exclusive access and return it when finished.
-/// If the file handle is not already present in the cache or all file handles for this
-/// file are checked out, the file handle is constructed and added to the cache.
-/// The cache can contain multiple file handles for the same file. If a file handle
-/// is checked out, it cannot be evicted from the cache. In this case, a cache can
-/// exceed the specified capacity.
+/// Threads check out a file handle for exclusive access, released automatically by RAII
+/// accessor. If the file handle is not already present in the cache or all file handles
+/// for this file are checked out, the file handle is emplaced in the cache. The cache can
+/// contain multiple file handles for the same file. If a file handle is checked out, it
+/// cannot be evicted from the cache. In this case, a cache can exceed the specified
+/// capacity.
 ///
-/// Some remote file systems such as S3 keep a connection as part of the file handle.
-/// The file handle cache is not suitable for those systems, as the cache size can
-/// exceed the limit on the number of concurrent connections. HDFS does not maintain
-/// a connection in the file handle, so remote HDFS file handles do not have this
-/// restriction.
+/// Remote file systems could keep a connection as part of the file handle without support
+/// for unbuffering. The file handle cache is not suitable for those systems, as the cache
+/// size can exceed the limit on the number of concurrent connections. HDFS does not
+/// maintain a connection in the file handle, S3A client supports unbuffering since
+/// IMPALA-8428, so those do not have this restriction.
 ///
 /// If there is a file handle in the cache and the underlying file is deleted,
 /// the file handle might keep the file from being deleted at the OS level. This can
@@ -108,7 +107,45 @@ class ExclusiveHdfsFileHandle : public HdfsFileHandle {
 /// TODO: The cache should also evict file handles more aggressively if the file handle's
 /// mtime is older than the file's current mtime.
 class FileHandleCache {
+ private:
+  /// Each partition operates independently, and thus has its own thread-safe cache.
+  /// To avoid contention on the lock_ due to false sharing the partitions are
+  /// aligned to cache line boundaries.
+  struct FileHandleCachePartition : public CacheLineAligned {
+    // Cache key is a pair of filename and mtime
+    // Using std::pair to spare boilerplate of hash function
+    typedef LruMultiCache<std::pair<std::string, int64_t>, CachedHdfsFileHandle>
+        CacheType;
+    CacheType cache;
+  };
+
  public:
+  /// RAII accessor built over LruMultiCache::Accessor to handle metrics and unbuffering.
+  /// Composition is used instead of inheritance to support the usage as in/out parameter
+  class Accessor {
+   public:
+    Accessor();
+    Accessor(FileHandleCachePartition::CacheType::Accessor&& cache_accessor);
+    Accessor(Accessor&&) = default;
+    Accessor& operator=(Accessor&&) = default;
+
+    DISALLOW_COPY_AND_ASSIGN(Accessor);
+
+    /// Handles metrics and unbuffering
+    ~Accessor();
+
+    /// Set function can be used if the Accessor is used as in/out parameter.
+    void Set(FileHandleCachePartition::CacheType::Accessor&& cache_accessor);
+
+    /// Interface mimics LruMultiCache::Accessor's interface, handles metrics
+    CachedHdfsFileHandle* Get();
+    void Release();
+    void Destroy();
+
+   private:
+    FileHandleCachePartition::CacheType::Accessor cache_accessor_;
+  };
+
   /// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS
   /// partitions. If the capacity does not split evenly, then the capacity is rounded
   /// up. The cache will age out any file handle that is unused for
@@ -123,88 +160,25 @@ class FileHandleCache {
   /// exceed the limit.
   Status Init() WARN_UNUSED_RESULT;
 
-  /// Get a file handle from the cache for the specified filename (fname) and
+  /// Get a file handle accessor from the cache for the specified filename (fname) and
   /// last modification time (mtime). This will hash the filename to determine
   /// which partition to use for this file handle.
   ///
   /// If 'require_new_handle' is false and the partition contains an available handle,
-  /// the handle is returned and cache_hit is set to true. Otherwise, the partition will
-  /// try to construct a file handle and add it to the partition. On success, the new
-  /// file handle will be returned with cache_hit set to false. On failure, nullptr will
-  /// be returned. In either case, the partition may evict a file handle to make room
-  /// for the new file handle.
+  /// an accessor is returned and cache_hit is set to true. Otherwise, the partition will
+  /// emplace file handle, an accessor to it will be returned with cache_hit set to false.
+  /// On failure, empty accessor will be returned. In either case, the partition may evict
+  /// a file handle to make room for the new file handle.
   ///
-  /// This obtains exclusive control over the returned file handle. It must be paired
-  /// with a call to ReleaseFileHandle to release exclusive control.
-  Status GetFileHandle(const hdfsFS& fs, std::string* fname,
-      int64_t mtime, bool require_new_handle, CachedHdfsFileHandle** handle_out,
-      bool* cache_hit) WARN_UNUSED_RESULT;
-
-  /// Release the exclusive hold on the specified file handle (which was obtained
-  /// by calling GetFileHandle). The cache may evict a file handle if the cache is
-  /// above capacity. If 'destroy_handle' is true, immediately remove this handle
-  /// from the cache.
-  void ReleaseFileHandle(std::string* fname, CachedHdfsFileHandle* fh,
-      bool destroy_handle);
+  /// This obtains exclusive control over the returned file handle.
+  Status GetFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
+      bool require_new_handle, Accessor* accessor, bool* cache_hit) WARN_UNUSED_RESULT;
 
  private:
-  struct FileHandleEntry;
-  typedef std::multimap<std::string, FileHandleEntry> MapType;
-
-  struct LruListEntry {
-    LruListEntry(typename MapType::iterator map_entry_in);
-    typename MapType::iterator map_entry;
-    uint64_t timestamp_seconds;
-  };
-  typedef std::list<LruListEntry> LruListType;
-
-  struct FileHandleEntry {
-    FileHandleEntry(std::unique_ptr<CachedHdfsFileHandle> fh_in, LruListType& lru_list)
-    : fh(std::move(fh_in)), lru_entry(lru_list.end()) {}
-    std::unique_ptr<CachedHdfsFileHandle> fh;
-
-    /// in_use is true for a file handle checked out via GetFileHandle() that has not
-    /// been returned via ReleaseFileHandle().
-    bool in_use = false;
-
-    /// Iterator to this element's location in the LRU list. This only points to a
-    /// valid location when in_use is true. For error-checking, this is set to
-    /// lru_list.end() when in_use is false.
-    typename LruListType::iterator lru_entry;
-  };
-
-  /// Each partition operates independently, and thus has its own cache, LRU list,
-  /// and corresponding lock. To avoid contention on the lock_ due to false sharing
-  /// the partitions are aligned to cache line boundaries.
-  struct FileHandleCachePartition : public CacheLineAligned {
-    /// Protects access to cache and lru_list.
-    SpinLock lock;
-
-    /// Multimap from the file name to the file handles for that file. The cache
-    /// can contain multiple file handles for the same file and some may have
-    /// different mtimes if the file is being modified. All file handles are always
-    /// owned by the cache.
-    MapType cache;
-
-    /// The LRU list only contains file handles that are not in use.
-    LruListType lru_list;
-
-    /// Maximum number of file handles in cache without evicting unused file handles.
-    /// It is not a strict limit, and can be exceeded if all file handles are in use.
-    size_t capacity;
-
-    /// Current number of file handles in the cache
-    size_t size;
-  };
-
   /// Periodic check to evict unused file handles. Only executed by eviction_thread_.
   void EvictHandlesLoop();
   static const int64_t EVICT_HANDLES_PERIOD_MS = 1000;
 
-  /// If the partition is above its capacity, evict the oldest unused file handles to
-  /// enforce the capacity.
-  void EvictHandles(FileHandleCachePartition& p);
-
   std::vector<FileHandleCachePartition> cache_partitions_;
 
   /// Maximum time before an unused file handle is aged out of the cache.
diff --git a/be/src/runtime/io/handle-cache.inline.h b/be/src/runtime/io/handle-cache.inline.h
index 66c4cc2..cc19630 100644
--- a/be/src/runtime/io/handle-cache.inline.h
+++ b/be/src/runtime/io/handle-cache.inline.h
@@ -21,6 +21,7 @@
 #include "runtime/io/hdfs-monitored-ops.h"
 #include "util/hash-util.h"
 #include "util/impalad-metrics.h"
+#include "util/lru-multi-cache.inline.h"
 #include "util/metrics.h"
 #include "util/time.h"
 
@@ -58,25 +59,76 @@ CachedHdfsFileHandle::~CachedHdfsFileHandle() {
   ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L);
 }
 
-FileHandleCache::FileHandleCache(size_t capacity,
-    size_t num_partitions, uint64_t unused_handle_timeout_secs, HdfsMonitor* hdfs_monitor)
+FileHandleCache::Accessor::Accessor() : cache_accessor_() {}
+
+FileHandleCache::Accessor::Accessor(
+    FileHandleCachePartition::CacheType::Accessor&& cache_accessor)
+  : cache_accessor_(std::move(cache_accessor)) {
+  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
+}
+
+void FileHandleCache::Accessor::Set(
+    FileHandleCachePartition::CacheType::Accessor&& cache_accessor) {
+  // No change if it was empty before
+  if (cache_accessor_.Get()) {
+    ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
+  }
+
+  cache_accessor_ = std::move(cache_accessor);
+
+  // No change if it has received an empty accessor
+  if (cache_accessor_.Get()) {
+    ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
+  }
+}
+
+CachedHdfsFileHandle* FileHandleCache::Accessor::Get() {
+  return cache_accessor_.Get();
+}
+
+void FileHandleCache::Accessor::Release() {
+  if (cache_accessor_.Get()) {
+    ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
+    cache_accessor_.Release();
+  }
+}
+
+void FileHandleCache::Accessor::Destroy() {
+  if (cache_accessor_.Get()) {
+    ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
+    cache_accessor_.Destroy();
+  }
+}
+
+FileHandleCache::Accessor::~Accessor() {
+  if (cache_accessor_.Get()) {
+    if (hdfsUnbufferFile(Get()->file()) != 0) {
+      VLOG_FILE << "FS does not support file handle unbuffering, closing file="
+                << cache_accessor_.GetKey()->first;
+      Destroy();
+    } else {
+      // Calling explicit release to handle metrics
+      Release();
+    }
+  }
+}
+
+FileHandleCache::FileHandleCache(size_t capacity, size_t num_partitions,
+    uint64_t unused_handle_timeout_secs, HdfsMonitor* hdfs_monitor)
   : cache_partitions_(num_partitions),
-  unused_handle_timeout_secs_(unused_handle_timeout_secs),
-  hdfs_monitor_(hdfs_monitor) {
+    unused_handle_timeout_secs_(unused_handle_timeout_secs),
+    hdfs_monitor_(hdfs_monitor) {
   DCHECK_GT(num_partitions, 0);
+
   size_t remainder = capacity % num_partitions;
   size_t base_capacity = capacity / num_partitions;
   size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity);
+
   for (FileHandleCachePartition& p : cache_partitions_) {
-    p.size = 0;
-    p.capacity = partition_capacity;
+    p.cache.SetCapacity(partition_capacity);
   }
 }
 
-FileHandleCache::LruListEntry::LruListEntry(
-    typename MapType::iterator map_entry_in)
-     : map_entry(map_entry_in), timestamp_seconds(MonotonicSeconds()) {}
-
 FileHandleCache::~FileHandleCache() {
   shut_down_promise_.Set(true);
   if (eviction_thread_ != nullptr) eviction_thread_->Join();
@@ -87,133 +139,59 @@ Status FileHandleCache::Init() {
       &FileHandleCache::EvictHandlesLoop, this, &eviction_thread_);
 }
 
-Status FileHandleCache::GetFileHandle(
-    const hdfsFS& fs, std::string* fname, int64_t mtime, bool require_new_handle,
-    CachedHdfsFileHandle** handle_out, bool* cache_hit) {
+Status FileHandleCache::GetFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
+    bool require_new_handle, FileHandleCache::Accessor* accessor, bool* cache_hit) {
   DCHECK_GT(mtime, 0);
   // Hash the key and get appropriate partition
   int index = HashUtil::Hash(fname->data(), fname->size(), 0) % cache_partitions_.size();
   FileHandleCachePartition& p = cache_partitions_[index];
 
+  auto cache_key = std::make_pair(*fname, mtime);
+
   // If this requires a new handle, skip to the creation codepath. Otherwise,
   // find an unused entry with the same mtime
   if (!require_new_handle) {
-    std::lock_guard<SpinLock> g(p.lock);
-    pair<typename MapType::iterator, typename MapType::iterator> range =
-      p.cache.equal_range(*fname);
-
-    // When picking a cached entry, always follow the ordering of the map and
-    // pick earlier entries first. This allows excessive entries for a file
-    // to age out. For example, if there are three entries for a file and only
-    // one is used at a time, only the first will be used and the other two
-    // can age out.
-    while (range.first != range.second) {
-      FileHandleEntry* elem = &range.first->second;
-      DCHECK(elem->fh.get() != nullptr);
-      if (!elem->in_use && elem->fh->mtime() == mtime) {
-        // This element is currently in the lru_list, which means that lru_entry must
-        // be an iterator pointing into the lru_list.
-        DCHECK(elem->lru_entry != p.lru_list.end());
-        // Remove the element from the lru_list and designate that it is not on
-        // the lru_list by resetting its iterator to point to the end of the list.
-        p.lru_list.erase(elem->lru_entry);
-        elem->lru_entry = p.lru_list.end();
-        *cache_hit = true;
-        elem->in_use = true;
-        *handle_out = elem->fh.get();
-        return Status::OK();
-      }
-      ++range.first;
+    auto cache_accessor = p.cache.Get(cache_key);
+
+    if (cache_accessor.Get()) {
+      // Found a handler in cache and reserved it
+      *cache_hit = true;
+      accessor->Set(std::move(cache_accessor));
+      return Status::OK();
     }
   }
 
   // There was no entry that was free or caller asked for a new handle
-  // Opening a file handle requires talking to the NameNode, so construct
-  // the file handle without holding the lock to reduce contention.
   *cache_hit = false;
-  // Create a new file handle
-  std::unique_ptr<CachedHdfsFileHandle> new_fh;
-  new_fh.reset(new CachedHdfsFileHandle(fs, fname, mtime));
-  RETURN_IF_ERROR(new_fh->Init(hdfs_monitor_));
-
-  // Get the lock and create/move the new entry into the map
-  // This entry is new and will be immediately used. Place it as the first entry
-  // for this file in the multimap. The ordering is largely unimportant if all the
-  // existing entries are in use. However, if require_new_handle is true, there may be
-  // unused entries, so it would make more sense to insert the new entry at the front.
-  std::lock_guard<SpinLock> g(p.lock);
-  pair<typename MapType::iterator, typename MapType::iterator> range =
-      p.cache.equal_range(*fname);
-  FileHandleEntry entry(std::move(new_fh), p.lru_list);
-  typename MapType::iterator new_it = p.cache.emplace_hint(range.first,
-      *fname, std::move(entry));
-  ++p.size;
-  if (p.size > p.capacity) EvictHandles(p);
-  FileHandleEntry* new_elem = &new_it->second;
-  DCHECK(!new_elem->in_use);
-  new_elem->in_use = true;
-  *handle_out = new_elem->fh.get();
-  return Status::OK();
-}
 
-void FileHandleCache::ReleaseFileHandle(std::string* fname,
-    CachedHdfsFileHandle* fh, bool destroy_handle) {
-  DCHECK(fh != nullptr);
-  // Hash the key and get appropriate partition
-  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % cache_partitions_.size();
-  FileHandleCachePartition& p = cache_partitions_[index];
-  std::lock_guard<SpinLock> g(p.lock);
-  pair<typename MapType::iterator, typename MapType::iterator> range =
-    p.cache.equal_range(*fname);
-
-  // TODO: This can be optimized by maintaining some state in the file handle about
-  // its location in the map.
-  typename MapType::iterator release_it = range.first;
-  while (release_it != range.second) {
-    FileHandleEntry* elem = &release_it->second;
-    if (elem->fh.get() == fh) break;
-    ++release_it;
-  }
-  DCHECK(release_it != range.second);
-
-  // This file handle is no longer referenced
-  FileHandleEntry* release_elem = &release_it->second;
-  DCHECK(release_elem->in_use);
-  release_elem->in_use = false;
-  if (destroy_handle) {
-    --p.size;
-    p.cache.erase(release_it);
-    return;
-  }
-  // Hdfs can use some memory for readahead buffering. Calling unbuffer reduces
-  // this buffering so that the file handle takes up less memory when in the cache.
-  // If unbuffering is not supported, then hdfsUnbufferFile() will return a non-zero
-  // return code, and we close the file handle and remove it from the cache.
-  if (hdfsUnbufferFile(release_elem->fh->file()) == 0) {
-    // This FileHandleEntry must not be in the lru list already, because it was
-    // in use. Verify this by checking that the lru_entry is pointing to the end,
-    // which cannot be true for any element in the lru list.
-    DCHECK(release_elem->lru_entry == p.lru_list.end());
-    // Add this to the lru list, establishing links in both directions.
-    // The FileHandleEntry has an iterator to the LruListEntry and the
-    // LruListEntry has an iterator to the location of the FileHandleEntry in
-    // the cache.
-    release_elem->lru_entry = p.lru_list.emplace(p.lru_list.end(), release_it);
-    if (p.size > p.capacity) EvictHandles(p);
-  } else {
-    VLOG_FILE << "FS does not support file handle unbuffering, closing file="
-              << fname;
-    --p.size;
-    p.cache.erase(release_it);
+  // Emplace a new file handle and get access
+  auto accessor_tmp = p.cache.EmplaceAndGet(cache_key, fs, fname, mtime);
+
+  // Opening a file handle requires talking to the NameNode so it can take some time.
+  Status status = accessor_tmp.Get()->Init(hdfs_monitor_);
+  if (UNLIKELY(!status.ok())) {
+    // Removing the handler from the cache after failed initialization.
+    accessor_tmp.Destroy();
+    return status;
   }
+
+  // Moving the cache accessor to the in/out parameter
+  accessor->Set(std::move(accessor_tmp));
+
+  return Status::OK();
 }
 
 void FileHandleCache::EvictHandlesLoop() {
   while (true) {
-    for (FileHandleCachePartition& p : cache_partitions_) {
-      std::lock_guard<SpinLock> g(p.lock);
-      EvictHandles(p);
+    if (unused_handle_timeout_secs_) {
+      for (FileHandleCachePartition& p : cache_partitions_) {
+        uint64_t now = MonotonicSeconds();
+        uint64_t oldest_allowed_timestamp =
+            now > unused_handle_timeout_secs_ ? now - unused_handle_timeout_secs_ : 0;
+        p.cache.EvictOlderThan(oldest_allowed_timestamp);
+      }
     }
+
     // This Get() will time out until shutdown, when the promise is set.
     bool timed_out;
     shut_down_promise_.Get(EVICT_HANDLES_PERIOD_MS, &timed_out);
@@ -224,29 +202,6 @@ void FileHandleCache::EvictHandlesLoop() {
   DCHECK(shut_down_promise_.Get());
 }
 
-void FileHandleCache::EvictHandles(
-    FileHandleCache::FileHandleCachePartition& p) {
-  uint64_t now = MonotonicSeconds();
-  uint64_t oldest_allowed_timestamp =
-      now > unused_handle_timeout_secs_ ? now - unused_handle_timeout_secs_ : 0;
-  while (p.lru_list.size() > 0) {
-    // Peek at the oldest element
-    LruListEntry oldest_entry = p.lru_list.front();
-    typename MapType::iterator oldest_entry_map_it = oldest_entry.map_entry;
-    uint64_t oldest_entry_timestamp = oldest_entry.timestamp_seconds;
-    // If the oldest element does not need to be aged out and the cache is not over
-    // capacity, then we are done and there is nothing to evict.
-    if (p.size <= p.capacity && (unused_handle_timeout_secs_ == 0 ||
-        oldest_entry_timestamp >= oldest_allowed_timestamp)) {
-      return;
-    }
-    // Evict the oldest element
-    DCHECK(!oldest_entry_map_it->second.in_use);
-    p.cache.erase(oldest_entry_map_it);
-    p.lru_list.pop_front();
-    --p.size;
-  }
-}
 }
 }
 #endif
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index 5683c5d..b128784 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -149,23 +149,18 @@ Status HdfsFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_
     // If the reader has an exclusive file handle, use it. Otherwise, borrow
     // a file handle from the cache.
     req_context_read_timer.Stop();
-    CachedHdfsFileHandle* borrowed_hdfs_fh = nullptr;
+
+    // RAII accessor, it will release the file handle at destruction
+    FileHandleCache::Accessor accessor;
+
     hdfsFile hdfs_file;
     if (exclusive_hdfs_fh_ != nullptr) {
       hdfs_file = exclusive_hdfs_fh_->file();
     } else {
-      RETURN_IF_ERROR(
-          io_mgr->GetCachedHdfsFileHandle(hdfs_fs_, scan_range_->file_string(),
-              scan_range_->mtime(), request_context, &borrowed_hdfs_fh));
-      hdfs_file = borrowed_hdfs_fh->file();
+      RETURN_IF_ERROR(io_mgr->GetCachedHdfsFileHandle(hdfs_fs_,
+          scan_range_->file_string(), scan_range_->mtime(), request_context, &accessor));
+      hdfs_file = accessor.Get()->file();
     }
-    // Make sure to release any borrowed file handle.
-    auto release_borrowed_hdfs_fh = MakeScopeExitTrigger([this, &borrowed_hdfs_fh]() {
-      if (borrowed_hdfs_fh != nullptr) {
-        scan_range_->io_mgr_->ReleaseCachedHdfsFileHandle(
-            scan_range_->file_string(), borrowed_hdfs_fh);
-      }
-    });
     req_context_read_timer.Start();
 
     while (*bytes_read < bytes_to_read) {
@@ -186,14 +181,14 @@ Status HdfsFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_
       // - first read was not successful
       // and
       // - used a borrowed file handle
-      if (!status.ok() && borrowed_hdfs_fh != nullptr) {
+      if (!status.ok() && accessor.Get()) {
         // The error may be due to a bad file handle. Reopen the file handle and retry.
         // Exclude this time from the read timers.
         req_context_read_timer.Stop();
         RETURN_IF_ERROR(
             io_mgr->ReopenCachedHdfsFileHandle(hdfs_fs_, scan_range_->file_string(),
-                scan_range_->mtime(), request_context, &borrowed_hdfs_fh));
-        hdfs_file = borrowed_hdfs_fh->file();
+                scan_range_->mtime(), request_context, &accessor));
+        hdfs_file = accessor.Get()->file();
         VLOG_FILE << "Reopening file " << scan_range_->file_string() << " with mtime "
                   << scan_range_->mtime() << " offset " << file_offset;
         req_context_read_timer.Start();
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 52e6f1e..f0d844a 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -160,6 +160,7 @@ add_library(UtilTests STATIC
   hdr-histogram-test.cc
   jwt-util-test.cc
   logging-support-test.cc
+  lru-multi-cache-test.cc
   metrics-test.cc
   min-max-filter-test.cc
   openssl-util-test.cc
@@ -230,6 +231,7 @@ ADD_UNIFIED_BE_LSAN_TEST(hdr-histogram-test HdrHistogramTest.*)
 # to use a unified executable
 ADD_BE_LSAN_TEST(internal-queue-test)
 ADD_UNIFIED_BE_LSAN_TEST(jwt-util-test "JwtUtilTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(lru-multi-cache-test "LruMultiCache.*")
 ADD_UNIFIED_BE_LSAN_TEST(logging-support-test "LoggingSupport.*")
 ADD_UNIFIED_BE_LSAN_TEST(metrics-test "MetricsTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(min-max-filter-test "MinMaxFilterTest.*")
diff --git a/be/src/util/lru-multi-cache-test.cc b/be/src/util/lru-multi-cache-test.cc
new file mode 100644
index 0000000..2c5cb64
--- /dev/null
+++ b/be/src/util/lru-multi-cache-test.cc
@@ -0,0 +1,459 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/lru-multi-cache.inline.h"
+
+#include "testutil/gtest-util.h"
+
+#include <memory>
+
+struct CollidingKey {
+  CollidingKey(const std::string& s) : s(s) {}
+  CollidingKey(const char* s) : s(s) {}
+  std::string s;
+};
+
+bool operator==(const CollidingKey& k1, const CollidingKey& k2) {
+  return k1.s == k2.s;
+}
+
+template <>
+struct std::hash<CollidingKey> {
+  size_t operator()(const CollidingKey& k) const noexcept { return 0; }
+};
+
+namespace impala {
+
+struct TestType {
+  // Testing EmplaceAndGet with perfect forwarding
+  explicit TestType(int i, float) : i(i) {}
+  int i;
+
+  // Making sure nothing is being copied or moved
+  TestType(const TestType&) = delete;
+  TestType(TestType&&) = delete;
+
+  TestType& operator=(const TestType&) = delete;
+  TestType& operator=(const TestType&&) = delete;
+};
+
+// Notation for implied state of inner data structure:
+// 108 -> 107 -> .... -> 102 | (19) (18) (109)
+// LRU list has 108 107 106 105 104 103 102 in this order, 7 elements available
+// 19, 18 and 109 are currently in use
+// 7 + 3 = 10 total elements in cache
+
+TEST(LruMultiCache, BasicTests) {
+  LruMultiCache<std::string, TestType> cache(100);
+
+  const size_t num_of_parallel_keys = 5;
+  std::string keys[num_of_parallel_keys] = {"a", "b", "c", "d", "e"};
+  int value_bases[num_of_parallel_keys] = {1, 10, 100, 1000, 10000};
+
+  for (size_t i = 0; i < 5; i++) {
+    std::string& key = keys[i];
+    int& value_base = value_bases[i];
+
+    // {}
+    ASSERT_EQ(nullptr, cache.Get(key).Get());
+
+    auto value = cache.EmplaceAndGet(key, value_base, 1.0f);
+
+    ASSERT_EQ(nullptr, cache.Get(key).Get());
+
+    value.Release();
+
+    // 1
+    value = cache.Get(key);
+
+    ASSERT_NE(nullptr, value.Get());
+
+    value.Release();
+
+    auto value2 = cache.EmplaceAndGet(key, value_base + 1, 1.0f);
+
+    value = cache.Get(key);
+
+    ASSERT_EQ(value_base, value.Get()->i);
+
+    value.Release();
+    value2.Release();
+    // 2 -> 1
+  }
+
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(10, cache.NumberOfAvailableObjects());
+
+  for (size_t i = 0; i < 5; i++) {
+    std::string& key = keys[i];
+    int& value_base = value_bases[i];
+
+    // 2 -> 1
+    auto value2 = cache.Get(key);
+
+    ASSERT_EQ(value_base + 1, value2.Get()->i);
+
+    auto value = cache.Get(key);
+
+    ASSERT_EQ(value_base, value.Get()->i);
+
+    value2.Release();
+    value.Release();
+
+    // 1 -> 2
+
+    value = cache.Get(key);
+
+    ASSERT_EQ(value_base, value.Get()->i);
+
+    cache.Rehash();
+
+    value2 = cache.Get(key);
+
+    ASSERT_EQ(value_base + 1, value2.Get()->i);
+
+    value.Release();
+    value2.Release();
+    // 2 -> 1
+
+    cache.Rehash();
+  }
+}
+
+TEST(LruMultiCache, EvictionTests) {
+  const size_t cache_capacity = 10;
+  LruMultiCache<std::string, TestType> cache(cache_capacity);
+  std::string key = "a";
+
+  for (size_t i = 0; i < cache_capacity; i++) {
+    cache.EmplaceAndGet(key, i, 1.0f).Release();
+    ASSERT_EQ(i + 1, cache.Size());
+    ASSERT_EQ(i + 1, cache.NumberOfAvailableObjects());
+  }
+
+  // 9 -> 8 .... -> 0
+  auto value = cache.Get(key);
+
+  ASSERT_EQ(9, value.Get()->i);
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(9, cache.NumberOfAvailableObjects());
+
+  value.Release();
+
+  for (size_t i = 0; i < cache_capacity; i++) {
+    cache.EmplaceAndGet(key, 10 + i, 1.0f).Release();
+    ASSERT_EQ(10, cache.Size());
+    ASSERT_EQ(10, cache.NumberOfAvailableObjects());
+  }
+
+  // 19 -> 18 .... -> 10
+  value = cache.Get(key);
+  ASSERT_EQ(19, value.Get()->i);
+
+  // 18 .... -> 10 | (19)
+  auto value2 = cache.Get(key);
+  ASSERT_EQ(18, value2.Get()->i);
+
+  // 17 -> 16 .... -> 10 | (19) (18)
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(8, cache.NumberOfAvailableObjects());
+
+  cache.Rehash();
+
+  for (size_t i = 0; i < cache_capacity; i++) {
+    cache.EmplaceAndGet(key, 100 + i, 1.0f).Release();
+    ASSERT_EQ(10, cache.Size());
+    ASSERT_EQ(8, cache.NumberOfAvailableObjects());
+  }
+
+  // 109 -> 108 .... -> 102 | (19) (18)
+  auto value3 = cache.Get(key);
+  ASSERT_EQ(109, value3.Get()->i);
+
+  // 108 .... -> 102 | (19) (18) (109)
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(7, cache.NumberOfAvailableObjects());
+
+  value2.Release();
+
+  // 18 -> 108 .... -> 102 | (19) (109)
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(8, cache.NumberOfAvailableObjects());
+
+  auto value4 = cache.Get(key);
+  ASSERT_EQ(18, value4.Get()->i);
+
+  // 108 .... -> 102 | (19) (109) (18)
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(7, cache.NumberOfAvailableObjects());
+
+  value.Release();
+  value3.Release();
+  value4.Release();
+
+  for (size_t i = 0; i < cache_capacity; i++) {
+    cache.EmplaceAndGet(key, 1000 + i, 1.0f).Release();
+    ASSERT_EQ(10, cache.Size());
+    ASSERT_EQ(10, cache.NumberOfAvailableObjects());
+  }
+
+  // 1009 .... -> 1000
+
+  std::vector<LruMultiCache<std::string, TestType>::Accessor> values;
+
+  for (size_t i = 0; i < cache_capacity; i++) {
+    values.push_back(cache.Get(key));
+    ASSERT_EQ(1000 + (cache_capacity - 1 - i), values[i].Get()->i);
+  }
+
+  // {} | (1009) ... (1000)
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(0, cache.NumberOfAvailableObjects());
+
+  value = cache.EmplaceAndGet(key, 10000, 1.0f);
+  ASSERT_EQ(10000, value.Get()->i);
+
+  // {} | (10001) (1009) ... (1000)
+  ASSERT_EQ(11, cache.Size());
+  ASSERT_EQ(0, cache.NumberOfAvailableObjects());
+
+  value2 = cache.EmplaceAndGet(key, 10001, 1.0f);
+  ASSERT_EQ(10001, value2.Get()->i);
+
+  // {} | (10001) (10000) (1009) ... (1000)
+  ASSERT_EQ(12, cache.Size());
+  ASSERT_EQ(0, cache.NumberOfAvailableObjects());
+
+  values[0].Release();
+  values[1].Release();
+
+  // {} | (10001) (10000) (1007) ... (1000)
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(0, cache.NumberOfAvailableObjects());
+
+  for (size_t i = 2; i < cache_capacity; i++) {
+    values[i].Release();
+  }
+
+  // 1000 -> ... -> 1007 | (10001) (10000)
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(8, cache.NumberOfAvailableObjects());
+
+  value3 = cache.Get(key);
+  ASSERT_EQ(1000, value3.Get()->i);
+
+  // 1001 -> ... -> 1007 | (10001) (10000) (1000)
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(7, cache.NumberOfAvailableObjects());
+
+  value2.Release();
+
+  // 10001 -> 1001 -> ... -> 1007 | (10000) (1000)
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(8, cache.NumberOfAvailableObjects());
+
+  value4 = cache.Get(key);
+  ASSERT_EQ(10001, value4.Get()->i);
+
+  // 1001 -> ... -> 1007 | (10001) (10000) (1000)
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(7, cache.NumberOfAvailableObjects());
+}
+
+TEST(LruMultiCache, AutoRelease) {
+  const size_t cache_capacity = 10;
+  LruMultiCache<std::string, TestType> cache(cache_capacity);
+  std::string key = "a";
+
+  for (size_t i = 0; i < cache_capacity; i++) {
+    { auto accessor = cache.EmplaceAndGet(key, i, 1.0f); }
+    ASSERT_EQ(i + 1, cache.Size());
+    ASSERT_EQ(i + 1, cache.NumberOfAvailableObjects());
+  }
+
+  // 9 -> 8 .... -> 0
+  {
+    auto value = cache.Get(key);
+
+    ASSERT_EQ(9, value.Get()->i);
+    ASSERT_EQ(10, cache.Size());
+    ASSERT_EQ(9, cache.NumberOfAvailableObjects());
+  }
+
+  for (size_t i = 0; i < cache_capacity; i++) {
+    { auto accessor = cache.EmplaceAndGet(key, 10 + i, 1.0f); }
+    ASSERT_EQ(10, cache.Size());
+    ASSERT_EQ(10, cache.NumberOfAvailableObjects());
+  }
+}
+
+TEST(LruMultiCache, Destroy) {
+  const size_t cache_capacity = 10;
+  LruMultiCache<std::string, TestType> cache(cache_capacity);
+  std::string key = "a";
+
+  for (size_t i = 0; i < cache_capacity; i++) {
+    { auto accessor = cache.EmplaceAndGet(key, i, 1.0f); }
+    ASSERT_EQ(i + 1, cache.Size());
+    ASSERT_EQ(i + 1, cache.NumberOfAvailableObjects());
+  }
+
+  // 9 -> 8 .... -> 0
+  {
+    auto value = cache.Get(key);
+
+    ASSERT_EQ(9, value.Get()->i);
+    ASSERT_EQ(10, cache.Size());
+    ASSERT_EQ(9, cache.NumberOfAvailableObjects());
+
+    // 8 -> .... -> 0
+    value.Destroy();
+    ASSERT_EQ(9, cache.Size());
+    ASSERT_EQ(9, cache.NumberOfAvailableObjects());
+  }
+
+  ASSERT_EQ(9, cache.Size());
+  ASSERT_EQ(9, cache.NumberOfAvailableObjects());
+
+  for (size_t i = 0; i < cache_capacity; i++) {
+    { auto accessor = cache.EmplaceAndGet(key, 10 + i, 1.0f); }
+    ASSERT_EQ(10, cache.Size());
+    ASSERT_EQ(10, cache.NumberOfAvailableObjects());
+  }
+}
+
+TEST(LruMultiCache, RemoveEmptyList) {
+  const size_t cache_capacity = 10;
+  LruMultiCache<std::string, TestType> cache(cache_capacity);
+  std::string key = "a";
+
+  ASSERT_EQ(0, cache.NumberOfKeys());
+
+  auto accessor = cache.EmplaceAndGet(key, 0, 1.0f);
+  ASSERT_EQ(1, cache.NumberOfKeys());
+
+  // Last element is destroyed in "a" list
+  accessor.Destroy();
+  ASSERT_EQ(0, cache.NumberOfKeys());
+
+  // Removed by eviction
+
+  for (size_t i = 0; i < cache_capacity; i++) {
+    cache.EmplaceAndGet(key, i, 1.0f).Release();
+    ASSERT_EQ(i + 1, cache.Size());
+    ASSERT_EQ(i + 1, cache.NumberOfAvailableObjects());
+  }
+
+  // All in "a" list
+  ASSERT_EQ(1, cache.NumberOfKeys());
+
+  std::string key2 = "b";
+
+  for (size_t i = 0; i < (cache_capacity - 1); i++) {
+    cache.EmplaceAndGet(key2, 10 + i, 1.0f).Release();
+    ASSERT_EQ(10, cache.Size());
+    ASSERT_EQ(10, cache.NumberOfAvailableObjects());
+
+    // 9-i in "a" list, i+1 in "b" list
+    ASSERT_EQ(2, cache.NumberOfKeys());
+  }
+
+  cache.EmplaceAndGet(key2, 19, 1.0f).Release();
+
+  // 10 in "b" list, "a" is removed
+  ASSERT_EQ(1, cache.NumberOfKeys());
+}
+
+TEST(LruMultiCache, HashCollision) {
+  const size_t cache_capacity = 10;
+  LruMultiCache<CollidingKey, TestType> cache(cache_capacity);
+
+  const size_t num_of_parallel_keys = 5;
+  CollidingKey keys[num_of_parallel_keys] = {"a", "b", "c", "d", "e"};
+  int value_bases[num_of_parallel_keys] = {1, 10, 100, 1000, 10000};
+
+  for (size_t i = 0; i < 5; i++) {
+    CollidingKey& key = keys[i];
+    int& value_base = value_bases[i];
+
+    // {}
+    ASSERT_EQ(nullptr, cache.Get(key).Get());
+
+    auto value = cache.EmplaceAndGet(key, value_base, 1.0f);
+
+    ASSERT_EQ(nullptr, cache.Get(key).Get());
+
+    value.Release();
+
+    // 1
+    value = cache.Get(key);
+
+    ASSERT_NE(nullptr, value.Get());
+
+    value.Release();
+
+    auto value2 = cache.EmplaceAndGet(key, value_base + 1, 1.0f);
+
+    value = cache.Get(key);
+
+    ASSERT_EQ(value_base, value.Get()->i);
+
+    value.Release();
+    value2.Release();
+    // 2 -> 1
+  }
+
+  ASSERT_EQ(10, cache.Size());
+  ASSERT_EQ(10, cache.NumberOfAvailableObjects());
+
+  for (size_t i = 0; i < 5; i++) {
+    CollidingKey& key = keys[i];
+    int& value_base = value_bases[i];
+
+    // 2 -> 1
+    auto value2 = cache.Get(key);
+
+    ASSERT_EQ(value_base + 1, value2.Get()->i);
+
+    auto value = cache.Get(key);
+
+    ASSERT_EQ(value_base, value.Get()->i);
+
+    value2.Release();
+    value.Release();
+
+    // 1 -> 2
+
+    value = cache.Get(key);
+
+    ASSERT_EQ(value_base, value.Get()->i);
+
+    cache.Rehash();
+
+    value2 = cache.Get(key);
+
+    ASSERT_EQ(value_base + 1, value2.Get()->i);
+
+    value.Release();
+    value2.Release();
+    // 2 -> 1
+
+    cache.Rehash();
+  }
+}
+
+} // namespace impala
\ No newline at end of file
diff --git a/be/src/util/lru-multi-cache.h b/be/src/util/lru-multi-cache.h
new file mode 100644
index 0000000..f704895
--- /dev/null
+++ b/be/src/util/lru-multi-cache.h
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <functional>
+#include <list>
+#include <memory>
+#include <mutex>
+#include <tuple>
+#include <unordered_map>
+
+#include <boost/intrusive/list.hpp>
+
+#include "gutil/macros.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+/// LruMultiCache is a threadsafe Least Recently Used Cache built on std::unordered_map
+/// and boost::intrusive::list
+///
+/// Features
+///   - can store multiple objects with the same key
+///   - provides unique access to the freshest available stored object
+///   - soft capacity to limit stored objects
+///   - EvictOlderThan() function to age out unused objects
+///   - O(1) operations (except EvictOlderThan(), which can trigger multiple evictions)
+///   - automatic release of objects with RAII accessor
+///   - explicit release/destroy through accessor
+///   - prevents aging by removing empty owning lists
+///
+/// Limitations
+///   - User has to guarantee the cache outlives the accessors
+///   - The cache can't force the capacity limit, it can overshoot in case all the objects
+///   are in use and we emplace more than the capacity, releases will evict the objects in
+///   this case
+///   - A bit heavy weight for simple usage (e.g. no thread safety, no unique access, no
+///   multi storage, no timestamp based eviction), it was designed to store
+///   HdfsFileHandles
+///   - Only emplace of objects are supported and it locks the cache during construction.
+///   - Inefficient memory cache usage
+///
+/// Design
+///          _ _ _ _ _
+///         |_|_|_|_|_|     unordered_map buckets
+///          |       |
+///       _ _|    _ _|_
+///      |0|1|   |0|0|1|    owning lists, 0 is available, 1 is in use
+///        \     / /
+///       (LRU calculations)
+///               |
+///        _    _ V  _
+///       |0|->|0|->|0|     LRU list
+///
+///
+///
+///   Cache
+///     - std::unordered_map stores std::list of the objects extended with internal
+///     information, these lists are the owners
+///     - Owning lists are maintaining LRU order only for available objects (order does
+///     not matter for currently used objects) to support O(1) get, the least recently
+///     used, currently available element is at the front of the list, if there is one
+///     - boost::intrusive::list maintains an LRU list of the currently available objects,
+///     this is an intrusive (as known as not owning) list, used for eviction
+///
+///   Accessor
+///     - RAII object which provides unique access to and object and releases
+///     automatically
+///     - Support explicit release and destroy too
+///
+
+template <typename KeyType, typename ValueType>
+class LruMultiCache {
+ public:
+  /// Definition is at the bottom, because it uses type defitions from private scope
+  class Accessor;
+
+  explicit LruMultiCache(size_t capacity = 0);
+
+  /// Making sure the cache stays in place, reference of it is used for release/destroy
+  LruMultiCache(LruMultiCache&&) = delete;
+  LruMultiCache& operator=(LruMultiCache&&) = delete;
+
+  DISALLOW_COPY_AND_ASSIGN(LruMultiCache);
+
+  /// Returns the number of stored objects in O(1) time
+  size_t Size();
+
+  /// Returns the number of owned lists in O(1) time
+  size_t NumberOfKeys();
+
+  void SetCapacity(size_t new_capacity);
+
+  /// Returns a unique accessor to the freshest available objects
+  [[nodiscard]] Accessor Get(const KeyType& key);
+
+  /// Emplace a new object and return a unique accessor to it.
+  /// Variadic template is used to forward the rest of the arguments to the stored
+  /// object's constructor
+  template <typename... Args>
+  [[nodiscard]] Accessor EmplaceAndGet(const KeyType& key, Args&&... args);
+
+  /// Evicts available objects with too old release time
+  void EvictOlderThan(uint64_t oldest_allowed_timestamp);
+
+  /// Number of available objects, O(n) complexity, for testing purposes
+  size_t NumberOfAvailableObjects();
+
+  /// Force rehash by increase bucket count, for testing purposes
+  void Rehash();
+
+ private:
+  /// Doubly linked list and auto_unlink is used for O(1) remove from LRU list, in case of
+  /// get and evict.
+  typedef boost::intrusive::list_member_hook<
+      boost::intrusive::link_mode<boost::intrusive::auto_unlink>>
+      link_type;
+
+  /// Internal type storing everything needed for O(1) operations
+  struct ValueType_internal {
+    typedef std::list<ValueType_internal> Container_internal;
+
+    /// Variadic template is used to support emplace
+    template <typename... Args>
+    explicit ValueType_internal(LruMultiCache& cache, const KeyType& key,
+        Container_internal& container, Args&&... args);
+
+    bool IsAvailable();
+
+    /// Member hook for LRU list
+    link_type member_hook;
+
+    /// std::unordered_map::iterators are invalidated during rehash, but it has stable
+    /// references
+    /// LruMultiCache reference is used during release/destroy.
+    LruMultiCache& cache;
+
+    /// Key is used to remove empty owning list in case of eviction or destruction of the
+    /// last element
+    const KeyType& key;
+
+    /// Container reference and iterator is used during release/destroy.
+    /// Container reference could be spared by using key, but this is faster, spares a
+    /// hash call
+    Container_internal& container;
+    typename Container_internal::iterator it;
+
+    /// This is the object the user wants to cache
+    ValueType value;
+
+    /// Timestamp of the last release of the object, used only by EvictOlderThan()
+    uint64_t timestamp_seconds;
+  };
+
+  /// Owning list typedef
+  typedef std::list<ValueType_internal> Container;
+
+  /// Hash table typedef
+  typedef std::unordered_map<KeyType, Container> HashTableType;
+
+  typedef boost::intrusive::member_hook<ValueType_internal, link_type,
+      &ValueType_internal::member_hook>
+      MemberHookOption;
+
+  /// No constant time size to support self unlink, cache size is tracked by the class
+  typedef boost::intrusive::list<ValueType_internal, MemberHookOption,
+      boost::intrusive::constant_time_size<false>>
+      LruListType;
+
+  void Release(ValueType_internal* p_value_internal);
+  void Destroy(ValueType_internal* p_value_internal);
+
+  /// Used by public functions, if an object became available but the cache if over
+  /// capacity, it removes it
+  void EvictOneIfNeeded();
+
+  /// Used by EvictOlderThan()
+  void EvictOne(ValueType_internal& value_internal);
+
+  HashTableType hash_table_;
+  LruListType lru_list_;
+
+  size_t capacity_;
+  size_t size_;
+
+  /// Protects access to cache. No need for read/write cache as there is no costly
+  /// pure read operation
+  SpinLock lock_;
+
+ public:
+  /// RAII Accessor to give unqiue access for a cached object
+  class Accessor {
+   public:
+    /// Default construction is used to support usage as in/out parameter
+    Accessor(ValueType_internal* p_value_internal = nullptr);
+
+    /// Similar interface to unique_ptr, only movable
+    Accessor(Accessor&&);
+    Accessor& operator=(Accessor&&);
+
+    DISALLOW_COPY_AND_ASSIGN(Accessor);
+
+    /// Automatic release in destructor
+    ~Accessor();
+
+    /// Returns a pointer to the object to the user.
+    /// Returns nullptr if it's an empty accessor;
+    ValueType* Get();
+
+    /// Returns a pointer to the stored key;
+    /// Returns nullptr if it's an empty accessor;
+    const KeyType* const GetKey() const;
+
+    /// Explicit release of the object
+    void Release();
+
+    /// Explicit destruction of the object
+    void Destroy();
+
+   private:
+    ValueType_internal* p_value_internal_;
+  };
+};
+
+} // namespace impala
\ No newline at end of file
diff --git a/be/src/util/lru-multi-cache.inline.h b/be/src/util/lru-multi-cache.inline.h
new file mode 100644
index 0000000..93bfd6b
--- /dev/null
+++ b/be/src/util/lru-multi-cache.inline.h
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "util/lru-multi-cache.h"
+#include "util/time.h"
+
+namespace impala {
+
+template <typename KeyType, typename ValueType>
+template <typename... Args>
+LruMultiCache<KeyType, ValueType>::ValueType_internal::ValueType_internal(
+    LruMultiCache& cache, const KeyType& key, Container_internal& container,
+    Args&&... args)
+  : cache(cache),
+    key(key),
+    container(container),
+    value(std::forward<Args>(args)...),
+    timestamp_seconds(MonotonicSeconds()) {}
+
+template <typename KeyType, typename ValueType>
+bool LruMultiCache<KeyType, ValueType>::ValueType_internal::IsAvailable() {
+  return member_hook.is_linked();
+}
+
+template <typename KeyType, typename ValueType>
+LruMultiCache<KeyType, ValueType>::Accessor::Accessor(
+    ValueType_internal* p_value_internal)
+  : p_value_internal_(p_value_internal) {}
+
+template <typename KeyType, typename ValueType>
+LruMultiCache<KeyType, ValueType>::Accessor::Accessor(Accessor&& rhs) {
+  p_value_internal_ = std::move(rhs.p_value_internal_);
+  rhs.p_value_internal_ = nullptr;
+}
+template <typename KeyType, typename ValueType>
+auto LruMultiCache<KeyType, ValueType>::Accessor::operator=(Accessor&& rhs) -> Accessor& {
+  p_value_internal_ = std::move(rhs.p_value_internal_);
+  rhs.p_value_internal_ = nullptr;
+  return (*this);
+}
+
+template <typename KeyType, typename ValueType>
+LruMultiCache<KeyType, ValueType>::Accessor::~Accessor() {
+  Release();
+}
+
+template <typename KeyType, typename ValueType>
+ValueType* LruMultiCache<KeyType, ValueType>::Accessor::Get() {
+  if (p_value_internal_) return &(p_value_internal_->value);
+
+  return nullptr;
+}
+
+template <typename KeyType, typename ValueType>
+const KeyType* const LruMultiCache<KeyType, ValueType>::Accessor::GetKey() const {
+  if (p_value_internal_) return &(p_value_internal_->key);
+
+  return nullptr;
+}
+
+template <typename KeyType, typename ValueType>
+void LruMultiCache<KeyType, ValueType>::Accessor::Release() {
+  /// Nullptr check as it has to be dereferenced to get the cache reference
+  /// No nullptr check is needed inside LruMultiCache::Release()
+  if (p_value_internal_) {
+    LruMultiCache& cache = p_value_internal_->cache;
+    cache.Release(p_value_internal_);
+    p_value_internal_ = nullptr;
+  }
+}
+
+template <typename KeyType, typename ValueType>
+void LruMultiCache<KeyType, ValueType>::Accessor::Destroy() {
+  /// Nullptr check as it has to be dereferenced to get the cache reference
+  /// No nullptr check is needed inside LruMultiCache::Destroy()
+  if (p_value_internal_) {
+    LruMultiCache& cache = p_value_internal_->cache;
+    cache.Destroy(p_value_internal_);
+    p_value_internal_ = nullptr;
+  }
+}
+
+template <typename KeyType, typename ValueType>
+LruMultiCache<KeyType, ValueType>::LruMultiCache(size_t capacity)
+  : capacity_(capacity), size_(0) {}
+
+template <typename KeyType, typename ValueType>
+size_t LruMultiCache<KeyType, ValueType>::Size() {
+  std::lock_guard<SpinLock> g(lock_);
+  return size_;
+}
+
+template <typename KeyType, typename ValueType>
+size_t LruMultiCache<KeyType, ValueType>::NumberOfKeys() {
+  std::lock_guard<SpinLock> g(lock_);
+  return hash_table_.size();
+}
+
+template <typename KeyType, typename ValueType>
+void LruMultiCache<KeyType, ValueType>::SetCapacity(size_t new_capacity) {
+  std::lock_guard<SpinLock> g(lock_);
+  capacity_ = new_capacity;
+}
+
+template <typename KeyType, typename ValueType>
+auto LruMultiCache<KeyType, ValueType>::Get(const KeyType& key) -> Accessor {
+  std::lock_guard<SpinLock> g(lock_);
+  auto hash_table_it = hash_table_.find(key);
+
+  // No owning list found with this key, the caller will have to create a new object
+  // with EmplaceAndGet()
+  if (hash_table_it == hash_table_.end()) return Accessor();
+
+  Container& container = hash_table_it->second;
+
+  // Empty containers are deleted automatiacally
+  DCHECK(!container.empty());
+
+  // All the available elements are in the front, only need to check the first
+  auto container_it = container.begin();
+
+  // No available object found, the caller will have to create a new one with
+  // EmplaceAndGet()
+  if (!container_it->IsAvailable()) return Accessor();
+
+  // Move the object to the back of the owning list as it is no longer available.
+  container.splice(container.end(), container, container_it);
+
+  // Remove the element from the LRU list as it is no longer available
+  container_it->member_hook.unlink();
+
+  return Accessor(&(*container_it));
+}
+
+template <typename KeyType, typename ValueType>
+template <typename... Args>
+auto LruMultiCache<KeyType, ValueType>::EmplaceAndGet(const KeyType& key, Args&&... args)
+    -> Accessor {
+  std::lock_guard<SpinLock> g(lock_);
+
+  // creates default container if there isn't one
+  Container& container = hash_table_[key];
+
+  // Get the reference of the key stored in unordered_map, the parameter could be
+  // temporary object but std::unordered_map has stable references
+  const KeyType& stored_key = hash_table_.find(key)->first;
+
+  // Place it as the last entry for the owning list, as it just got reserved
+  auto container_it = container.emplace(
+      container.end(), (*this), stored_key, container, std::forward<Args>(args)...);
+
+  // Only can set this after emplace
+  container_it->it = container_it;
+
+  size_++;
+
+  // Need to remove the oldest available if the cache is over the capacity
+  EvictOneIfNeeded();
+
+  return Accessor(&(*container_it));
+}
+
+template <typename KeyType, typename ValueType>
+void LruMultiCache<KeyType, ValueType>::Release(ValueType_internal* p_value_internal) {
+  std::lock_guard<SpinLock> g(lock_);
+
+  // This only can be used by the accessor, which already checks for nullptr
+  DCHECK(p_value_internal);
+
+  // Has to be currently not available
+  DCHECK(!p_value_internal->IsAvailable());
+
+  p_value_internal->timestamp_seconds = MonotonicSeconds();
+
+  Container& container = p_value_internal->container;
+
+  // Move the object to the front, keep LRU relation in owning list too to
+  // be able to age out unused objects
+  container.splice(container.begin(), container, p_value_internal->it);
+
+  // Add the object to LRU list too as it is now available for usage
+  lru_list_.push_front(container.front());
+
+  // In case we overshot the capacity already, the cache can evict the oldest one
+  EvictOneIfNeeded();
+}
+
+template <typename KeyType, typename ValueType>
+void LruMultiCache<KeyType, ValueType>::Destroy(ValueType_internal* p_value_internal) {
+  std::lock_guard<SpinLock> g(lock_);
+
+  // This only can be used by the accessor, which already checks for nullptr
+  DCHECK(p_value_internal);
+
+  // Has to be currently not available
+  DCHECK(!p_value_internal->IsAvailable());
+
+  Container& container = p_value_internal->container;
+
+  if (container.size() == 1) {
+    // Last element, owning list can be removed to prevent aging
+    hash_table_.erase(p_value_internal->key);
+  } else {
+    // Remove from owning list
+    container.erase(p_value_internal->it);
+  }
+
+  size_--;
+}
+
+template <typename KeyType, typename ValueType>
+size_t LruMultiCache<KeyType, ValueType>::NumberOfAvailableObjects() {
+  std::lock_guard<SpinLock> g(lock_);
+  return lru_list_.size();
+}
+
+template <typename KeyType, typename ValueType>
+void LruMultiCache<KeyType, ValueType>::Rehash() {
+  std::lock_guard<SpinLock> g(lock_);
+  hash_table_.rehash(hash_table_.bucket_count() + 1);
+}
+
+template <typename KeyType, typename ValueType>
+void LruMultiCache<KeyType, ValueType>::EvictOne(ValueType_internal& value_internal) {
+  // SpinLock is locked by the caller evicting function
+  lock_.DCheckLocked();
+
+  // Has to be available to evict
+  DCHECK(value_internal.IsAvailable());
+
+  // Remove from LRU cache
+  value_internal.member_hook.unlink();
+
+  Container& container = value_internal.container;
+
+  if (container.size() == 1) {
+    // Last element, owning list can be removed to prevent aging
+    hash_table_.erase(value_internal.key);
+  } else {
+    // Remove from owning list
+    container.erase(value_internal.it);
+  }
+
+  size_--;
+}
+
+template <typename KeyType, typename ValueType>
+void LruMultiCache<KeyType, ValueType>::EvictOneIfNeeded() {
+  // SpinLock is locked by the caller public function
+  lock_.DCheckLocked();
+
+  if (!lru_list_.empty() && size_ > capacity_) {
+    EvictOne(lru_list_.back());
+  }
+}
+
+template <typename KeyType, typename ValueType>
+void LruMultiCache<KeyType, ValueType>::EvictOlderThan(
+    uint64_t oldest_allowed_timestamp) {
+  std::lock_guard<SpinLock> g(lock_);
+
+  // Stop eviction if
+  //   - there are no more available (i.e. evictable) objects
+  //   - cache size is below capacity and the oldest object is not older than the limit
+  while (!lru_list_.empty()
+      && (size_ > capacity_
+             || lru_list_.back().timestamp_seconds < oldest_allowed_timestamp)) {
+    EvictOne(lru_list_.back());
+  }
+}
+
+} // namespace impala
\ No newline at end of file