You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/12/01 17:53:59 UTC

[accumulo] branch master updated: ACCUMULO-4641 Added loading to cache API (#321)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new d1d9344  ACCUMULO-4641 Added loading to cache API (#321)
d1d9344 is described below

commit d1d9344082702206278b3eee442351479171e644
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Fri Dec 1 12:53:56 2017 -0500

    ACCUMULO-4641 Added loading to cache API (#321)
---
 .../accumulo/core/client/rfile/RFileScanner.java   |  44 ++-
 .../core/client/rfile/RFileSummariesRetriever.java |   5 +-
 .../accumulo/core/file/blockfile/ABlockReader.java |   7 +-
 .../core/file/blockfile/cache/BlockCache.java      |  46 ++-
 .../core/file/blockfile/cache/CacheEntry.java      |  22 +-
 .../core/file/blockfile/cache/lru/CachedBlock.java |  66 +++-
 .../file/blockfile/cache/lru/LruBlockCache.java    |  76 +++-
 .../cache/lru/SynchronousLoadingBlockCache.java    | 139 +++++++
 .../blockfile/cache/tinylfu/TinyLfuBlockCache.java | 160 ++++++--
 .../file/blockfile/impl/CachableBlockFile.java     | 431 +++++++++++----------
 .../accumulo/core/file/rfile/BlockIndex.java       |  31 +-
 .../accumulo/core/file/rfile/bcfile/BCFile.java    | 180 +++------
 .../accumulo/core/summary/SummaryReader.java       |  37 +-
 .../file/blockfile/cache/TestLruBlockCache.java    |  16 +-
 .../accumulo/core/file/rfile/BlockIndexTest.java   |  17 +-
 .../core/file/rfile/MultiLevelIndexTest.java       |   2 +-
 .../apache/accumulo/core/file/rfile/RFileTest.java |   3 +-
 17 files changed, 847 insertions(+), 435 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 301dfc5..15c89ed 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -23,10 +23,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
@@ -95,10 +97,6 @@ class RFileScanner extends ScannerOptions implements Scanner {
   // block is accessed the entire thing is read into memory immediately allocating and deallocating
   // a decompressor. If the user does not read all data, no decompressors are left allocated.
   private static class NoopCache implements BlockCache {
-    @Override
-    public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
-      return null;
-    }
 
     @Override
     public CacheEntry cacheBlock(String blockName, byte[] buf) {
@@ -134,6 +132,43 @@ class RFileScanner extends ScannerOptions implements Scanner {
         }
       };
     }
+
+    @Override
+    public CacheEntry getBlock(String blockName, Loader loader) {
+      Map<String,Loader> depLoaders = loader.getDependencies();
+      Map<String,byte[]> depData;
+
+      switch (depLoaders.size()) {
+        case 0:
+          depData = Collections.emptyMap();
+          break;
+        case 1:
+          Entry<String,Loader> entry = depLoaders.entrySet().iterator().next();
+          depData = Collections.singletonMap(entry.getKey(), getBlock(entry.getKey(), entry.getValue()).getBuffer());
+          break;
+        default:
+          depData = new HashMap<>();
+          depLoaders.forEach((k, v) -> depData.put(k, getBlock(k, v).getBuffer()));
+      }
+
+      byte[] data = loader.load(Integer.MAX_VALUE, depData);
+
+      return new CacheEntry() {
+
+        @Override
+        public byte[] getBuffer() {
+          return data;
+        }
+
+        @Override
+        public <T extends Weighbable> T getIndex(Supplier<T> supplier) {
+          return null;
+        }
+
+        @Override
+        public void indexWeightChanged() {}
+      };
+    }
   }
 
   RFileScanner(Opts opts) {
@@ -305,6 +340,7 @@ class RFileScanner extends ScannerOptions implements Scanner {
       RFileSource[] sources = opts.in.getSources();
       List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length);
       for (int i = 0; i < sources.length; i++) {
+        // TODO may have been a bug with multiple files and caching in older version...
         FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
         readers.add(new RFile.Reader(new CachableBlockFile.Reader("source-" + i, inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache,
             DefaultConfiguration.getInstance())));
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
index d3a83b0..e4b3e05 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
@@ -86,9 +86,8 @@ class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions
     RFileSource[] sources = in.getSources();
     try {
       SummaryCollection all = new SummaryCollection();
-      for (int i = 0; i < sources.length; i++) {
-        RFileSource source = sources[i];
-        SummaryReader fileSummary = SummaryReader.load("source-"+i, conf, acuconf, source.getInputStream(), source.getLength(), summarySelector, factory);
+      for (RFileSource source : sources) {
+        SummaryReader fileSummary = SummaryReader.load(conf, acuconf, source.getInputStream(), source.getLength(), summarySelector, factory);
         SummaryCollection sc = fileSummary.getSummaries(Collections.singletonList(new Gatherer.RowRange(startRow, endRow)));
         all.merge(sc, factory);
       }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
index 9f485d8..a0a7da8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
@@ -19,6 +19,9 @@ package org.apache.accumulo.core.file.blockfile;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
 
 /*
  * Minimal interface to read a block from a
@@ -48,7 +51,9 @@ public interface ABlockReader extends DataInput {
    */
   int getPosition();
 
-  <T> T getIndex(Class<T> clazz);
+  <T extends Weighbable> T getIndex(Supplier<T> supplier);
+
+  void indexWeightChanged();
 
   byte[] getBuffer();
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
index b27c918..d5f8215 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
@@ -17,6 +17,10 @@
  */
 package org.apache.accumulo.core.file.blockfile.cache;
 
+import java.util.Map;
+
+import org.apache.accumulo.core.file.blockfile.cache.lru.SynchronousLoadingBlockCache;
+
 /**
  * Block cache interface.
  */
@@ -29,29 +33,47 @@ public interface BlockCache {
    *          Zero-based file block number.
    * @param buf
    *          The block contents wrapped in a ByteBuffer.
-   * @param inMemory
-   *          Whether block should be treated as in-memory
    */
-  CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory);
+  CacheEntry cacheBlock(String blockName, byte buf[]);
 
   /**
-   * Add block to cache (defaults to not in-memory).
+   * Fetch block from cache.
    *
    * @param blockName
-   *          Zero-based file block number.
-   * @param buf
-   *          The block contents wrapped in a ByteBuffer.
+   *          Block name to fetch.
+   * @return Block or null if block is not in the cache.
    */
-  CacheEntry cacheBlock(String blockName, byte buf[]);
+  CacheEntry getBlock(String blockName);
+
+  public static interface Loader {
+    /**
+     * The cache blocks that this loader depends on. If a loader has no dependencies, then it should return an empty map. All dependencies must be loaded before
+     * calling {@link #load(int, Map)}.
+     */
+    Map<String,Loader> getDependencies();
+
+    /**
+     * Loads a block. Anything returned by {@link #getDependencies()} should be loaded and passed.
+     *
+     * @param maxSize
+     *          This is the maximum block size that will be cached.
+     * @return The loaded block or null if loading the block would exceed maxSize.
+     */
+    byte[] load(int maxSize, Map<String,byte[]> dependencies);
+  }
 
   /**
-   * Fetch block from cache.
+   * This method allows a cache to prevent concurrent loads of the same block. However a cache implementation is not required to prevent concurrent loads.
+   * {@link SynchronousLoadingBlockCache} is an abstract class that a cache can extent which does prevent concurrent loading of the same block.
+   *
    *
    * @param blockName
-   *          Block number to fetch.
-   * @return Block or null if block is not in the cache.
+   *          Block name to fetch
+   * @param loader
+   *          If the block is not present in the cache, the loader can be called to load it.
+   * @return Block or null if block is not in the cache or didn't load.
    */
-  CacheEntry getBlock(String blockName);
+  CacheEntry getBlock(String blockName, Loader loader);
 
   /**
    * Get the maximum amount of on heap memory this cache will use.
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
index 2faf696..b1aeced 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
@@ -16,11 +16,27 @@
  */
 package org.apache.accumulo.core.file.blockfile.cache;
 
+import java.util.function.Supplier;
+
 public interface CacheEntry {
-  byte[] getBuffer();
 
-  Object getIndex();
+  public static interface Weighbable {
+    int weight();
+  }
+
+  byte[] getBuffer();
 
-  void setIndex(Object idx);
+  /**
+   * Optionally cache what is returned by the supplier along with this cache entry. If caching what is returned by the supplier is not supported, its ok to
+   * return null.
+   *
+   * <p>
+   * This method exists to support building indexes of frequently accessed cached data.
+   */
+  <T extends Weighbable> T getIndex(Supplier<T> supplier);
 
+  /**
+   * The object optionally stored by {@link #getIndex(Supplier)} is a mutable object. Accumulo will call this method whenever the weight of that object changes.
+   */
+  void indexWeightChanged();
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java
index c1124f4..ec72283 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java
@@ -18,8 +18,10 @@
 package org.apache.accumulo.core.file.blockfile.cache.lru;
 
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
-import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
 import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize;
 import org.apache.accumulo.core.file.blockfile.cache.impl.SizeConstants;
 
@@ -30,10 +32,10 @@ import org.apache.accumulo.core.file.blockfile.cache.impl.SizeConstants;
  * Makes the block memory-aware with {@link HeapSize} and Comparable to sort by access time for the LRU. It also takes care of priority by either instantiating
  * as in-memory or handling the transition from single to multiple access.
  */
-public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntry {
+public class CachedBlock implements HeapSize, Comparable<CachedBlock> {
 
   public final static long PER_BLOCK_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * SizeConstants.SIZEOF_LONG)
-      + ClassSize.STRING + ClassSize.BYTE_BUFFER);
+      + ClassSize.STRING + ClassSize.BYTE_BUFFER + ClassSize.REFERENCE);
 
   public static enum BlockPriority {
     /**
@@ -50,18 +52,17 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr
     MEMORY
   }
 
+  private byte[] buffer;
   private final String blockName;
-  private final byte buf[];
   private volatile long accessTime;
-  private long size;
+  private volatile long recordedSize;
   private BlockPriority priority;
-  private Object index;
+  private Weighbable index;
 
   public CachedBlock(String blockName, byte buf[], long accessTime, boolean inMemory) {
+    this.buffer = buf;
     this.blockName = blockName;
-    this.buf = buf;
     this.accessTime = accessTime;
-    this.size = ClassSize.align(blockName.length()) + ClassSize.align(buf.length) + PER_BLOCK_OVERHEAD;
     if (inMemory) {
       this.priority = BlockPriority.MEMORY;
     } else {
@@ -81,7 +82,10 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr
 
   @Override
   public long heapSize() {
-    return size;
+    if (recordedSize < 0) {
+      throw new IllegalStateException("Block was evicted");
+    }
+    return recordedSize;
   }
 
   @Override
@@ -101,11 +105,6 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr
     return this.accessTime < that.accessTime ? 1 : -1;
   }
 
-  @Override
-  public byte[] getBuffer() {
-    return this.buf;
-  }
-
   public String getName() {
     return this.blockName;
   }
@@ -114,13 +113,40 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr
     return this.priority;
   }
 
-  @Override
-  public Object getIndex() {
-    return index;
+  public byte[] getBuffer() {
+    return buffer;
   }
 
-  @Override
-  public void setIndex(Object idx) {
-    this.index = idx;
+  @SuppressWarnings("unchecked")
+  public synchronized <T extends Weighbable> T getIndex(Supplier<T> supplier) {
+    if (index == null && recordedSize >= 0) {
+      index = supplier.get();
+    }
+
+    return (T) index;
+  }
+
+  public synchronized long recordSize(AtomicLong totalSize) {
+    if (recordedSize >= 0) {
+      long indexSize = (index == null) ? 0 : index.weight();
+      long newSize = ClassSize.align(blockName.length()) + ClassSize.align(buffer.length) + PER_BLOCK_OVERHEAD + indexSize;
+      long delta = newSize - recordedSize;
+      recordedSize = newSize;
+      return totalSize.addAndGet(delta);
+    }
+
+    throw new IllegalStateException("Block was evicted");
+  }
+
+  public synchronized long evicted(AtomicLong totalSize) {
+    if (recordedSize >= 0) {
+      totalSize.addAndGet(recordedSize * -1);
+      long tmp = recordedSize;
+      recordedSize = -1;
+      index = null;
+      return tmp;
+    }
+
+    throw new IllegalStateException("already evicted");
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
index f27fd43..7211f5d 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
@@ -65,7 +66,7 @@ import org.slf4j.LoggerFactory;
  * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then
  * uses the priority chunk sizes to evict fairly according to the relative sizes and usage.
  */
-public class LruBlockCache implements BlockCache, HeapSize {
+public class LruBlockCache extends SynchronousLoadingBlockCache implements BlockCache, HeapSize {
 
   private static final Logger log = LoggerFactory.getLogger(LruBlockCache.class);
 
@@ -114,6 +115,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
    *          block cache configuration
    */
   public LruBlockCache(final LruBlockCacheConfiguration conf) {
+    super();
     this.conf = conf;
 
     int mapInitialSize = (int) Math.ceil(1.2 * conf.getMaxSize() / conf.getBlockSize());
@@ -145,6 +147,43 @@ public class LruBlockCache implements BlockCache, HeapSize {
     return overhead;
   }
 
+  /*
+   * This class exists so that every cache entry does not have a reference to the cache.
+   */
+  private class LruCacheEntry implements CacheEntry {
+    private final CachedBlock block;
+
+    LruCacheEntry(CachedBlock block) {
+      this.block = block;
+    }
+
+    @Override
+    public byte[] getBuffer() {
+      return block.getBuffer();
+    }
+
+    @Override
+    public <T extends Weighbable> T getIndex(Supplier<T> supplier) {
+      return block.getIndex(supplier);
+    }
+
+    @Override
+    public void indexWeightChanged() {
+      long newSize = block.recordSize(size);
+      if (newSize > acceptableSize() && !evictionInProgress) {
+        runEviction();
+      }
+    }
+  }
+
+  private CacheEntry wrap(CachedBlock cb) {
+    if (cb == null) {
+      return null;
+    }
+
+    return new LruCacheEntry(cb);
+  }
+
   // BlockCache implementation
 
   /**
@@ -160,7 +199,6 @@ public class LruBlockCache implements BlockCache, HeapSize {
    * @param inMemory
    *          if block is in-memory
    */
-  @Override
   public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
     CachedBlock cb = map.get(blockName);
     if (cb != null) {
@@ -175,7 +213,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
         cb.access(count.incrementAndGet());
       } else {
         // Actually added block to cache
-        long newSize = size.addAndGet(cb.heapSize());
+        long newSize = cb.recordSize(size);
         elements.incrementAndGet();
         if (newSize > acceptableSize() && !evictionInProgress) {
           runEviction();
@@ -183,7 +221,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
       }
     }
 
-    return cb;
+    return wrap(cb);
   }
 
   /**
@@ -210,7 +248,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
    * @return buffer of specified block name, or null if not in cache
    */
   @Override
-  public CachedBlock getBlock(String blockName) {
+  public CacheEntry getBlock(String blockName) {
     CachedBlock cb = map.get(blockName);
     if (cb == null) {
       stats.miss();
@@ -218,15 +256,25 @@ public class LruBlockCache implements BlockCache, HeapSize {
     }
     stats.hit();
     cb.access(count.incrementAndGet());
-    return cb;
+    return wrap(cb);
+  }
+
+  @Override
+  protected CacheEntry getBlockNoStats(String blockName) {
+    CachedBlock cb = map.get(blockName);
+    if (cb != null) {
+      cb.access(count.incrementAndGet());
+    }
+    return wrap(cb);
   }
 
   protected long evictBlock(CachedBlock block) {
-    map.remove(block.getName());
-    size.addAndGet(-1 * block.heapSize());
-    elements.decrementAndGet();
-    stats.evicted();
-    return block.heapSize();
+    if (map.remove(block.getName()) != null) {
+      elements.decrementAndGet();
+      stats.evicted();
+      return block.evicted(size);
+    }
+    return 0;
   }
 
   /**
@@ -386,6 +434,11 @@ public class LruBlockCache implements BlockCache, HeapSize {
     return this.conf.getMaxSize();
   }
 
+  @Override
+  public int getMaxEntrySize() {
+    return (int) Math.min(Integer.MAX_VALUE, getMaxSize());
+  }
+
   /**
    * Get the current size of this cache.
    *
@@ -507,6 +560,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
    * <p>
    * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes.
    */
+  @Override
   public CacheStats getStats() {
     return this.stats;
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/SynchronousLoadingBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/SynchronousLoadingBlockCache.java
new file mode 100644
index 0000000..82c5ef4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/SynchronousLoadingBlockCache.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.blockfile.cache.lru;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+
+/**
+ * This class implements loading in such a way that load operations for the same block will not run concurrently.
+ */
+public abstract class SynchronousLoadingBlockCache implements BlockCache {
+
+  private final Lock[] loadLocks;
+
+  /**
+   * @param numLocks
+   *          this controls how many load operations can run concurrently
+   */
+  SynchronousLoadingBlockCache(int numLocks) {
+    loadLocks = new Lock[numLocks];
+    for (int i = 0; i < loadLocks.length; i++) {
+      loadLocks[i] = new ReentrantLock();
+    }
+  }
+
+  public SynchronousLoadingBlockCache() {
+    this(2017);
+  }
+
+  private Map<String,byte[]> resolveDependencies(Map<String,Loader> loaderDeps) {
+    Map<String,byte[]> depData;
+
+    switch (loaderDeps.size()) {
+      case 0:
+        depData = Collections.emptyMap();
+        break;
+      case 1: {
+        Entry<String,Loader> entry = loaderDeps.entrySet().iterator().next();
+        CacheEntry dce = getBlock(entry.getKey(), entry.getValue());
+        if (dce == null) {
+          depData = null;
+        } else {
+          depData = Collections.singletonMap(entry.getKey(), dce.getBuffer());
+        }
+        break;
+      }
+      default: {
+        depData = new HashMap<>();
+        Set<Entry<String,Loader>> es = loaderDeps.entrySet();
+        for (Entry<String,Loader> entry : es) {
+          CacheEntry dce = getBlock(entry.getKey(), entry.getValue());
+          if (dce == null) {
+            depData = null;
+            break;
+          }
+
+          depData.put(entry.getKey(), dce.getBuffer());
+        }
+        break;
+      }
+    }
+
+    return depData;
+  }
+
+  /**
+   * Get the maximum size of an individual cache entry.
+   */
+  protected abstract int getMaxEntrySize();
+
+  /**
+   * Get a block from the cache without changing any stats the cache is keeping.
+   */
+  protected abstract CacheEntry getBlockNoStats(String blockName);
+
+  @Override
+  public CacheEntry getBlock(String blockName, Loader loader) {
+
+    CacheEntry ce = getBlock(blockName);
+    if (ce != null) {
+      return ce;
+    }
+
+    // intentionally done before getting lock
+    Map<String,byte[]> depData = resolveDependencies(loader.getDependencies());
+    if (depData == null) {
+      return null;
+    }
+
+    int lockIndex = (blockName.hashCode() & 0x7fffffff) % loadLocks.length;
+    Lock loadLock = loadLocks[lockIndex];
+
+    try {
+      loadLock.lock();
+
+      // check again after getting lock, could have loaded while waiting on lock
+      ce = getBlockNoStats(blockName);
+      if (ce != null) {
+        return ce;
+      }
+
+      // not in cache so load data
+      byte[] data = loader.load(getMaxEntrySize(), depData);
+      if (data == null) {
+        return null;
+      }
+
+      // attempt to add data to cache
+      return cacheBlock(blockName, data);
+    } finally {
+      loadLock.unlock();
+    }
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
index 6edfb01..6339e8e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
@@ -17,15 +17,19 @@
  */
 package org.apache.accumulo.core.file.blockfile.cache.tinylfu;
 
-import static java.util.Objects.requireNonNull;
-
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager.Configuration;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
 import org.apache.accumulo.core.file.blockfile.cache.CacheType;
 import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize;
 import org.apache.accumulo.core.file.blockfile.cache.impl.SizeConstants;
@@ -62,10 +66,9 @@ public final class TinyLfuBlockCache implements BlockCache {
           return keyWeight + block.weight();
         }).maximumWeight(conf.getMaxSize(type)).recordStats().build();
     policy = cache.policy().eviction().get();
-    statsExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true)
-        .build());
+    statsExecutor = Executors
+        .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build());
     statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, TimeUnit.SECONDS);
-
   }
 
   @Override
@@ -80,23 +83,14 @@ public final class TinyLfuBlockCache implements BlockCache {
 
   @Override
   public CacheEntry getBlock(String blockName) {
-    return cache.getIfPresent(blockName);
+    return wrap(blockName, cache.getIfPresent(blockName));
   }
 
   @Override
   public CacheEntry cacheBlock(String blockName, byte[] buffer) {
-    return cache.asMap().compute(blockName, (key, block) -> {
-      if (block == null) {
-        return new Block(buffer);
-      }
-      block.buffer = buffer;
-      return block;
-    });
-  }
-
-  @Override
-  public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored */boolean inMemory) {
-    return cacheBlock(blockName, buffer);
+    return wrap(blockName, cache.asMap().compute(blockName, (key, block) -> {
+      return new Block(buffer);
+    }));
   }
 
   @Override
@@ -123,31 +117,139 @@ public final class TinyLfuBlockCache implements BlockCache {
     log.debug(cache.stats().toString());
   }
 
-  private static final class Block implements CacheEntry {
-    private volatile byte[] buffer;
-    private volatile Object index;
+  private static final class Block {
+
+    private final byte[] buffer;
+    private Weighbable index;
+    private volatile int lastIndexWeight;
 
     Block(byte[] buffer) {
-      this.buffer = requireNonNull(buffer);
+      this.buffer = buffer;
+      this.lastIndexWeight = buffer.length / 100;
+    }
+
+    int weight() {
+      int indexWeight = lastIndexWeight + SizeConstants.SIZEOF_INT + ClassSize.REFERENCE;
+      return indexWeight + ClassSize.align(getBuffer().length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY;
     }
 
-    @Override
     public byte[] getBuffer() {
       return buffer;
     }
 
+    @SuppressWarnings("unchecked")
+    public synchronized <T extends Weighbable> T getIndex(Supplier<T> supplier) {
+      if (index == null) {
+        index = supplier.get();
+      }
+
+      return (T) index;
+    }
+
+    public synchronized boolean indexWeightChanged() {
+      if (index != null) {
+        int indexWeight = index.weight();
+        if (indexWeight > lastIndexWeight) {
+          lastIndexWeight = indexWeight;
+          return true;
+        }
+      }
+
+      return false;
+    }
+  }
+
+  private CacheEntry wrap(String cacheKey, Block block) {
+    if (block != null) {
+      return new TlfuCacheEntry(cacheKey, block);
+    }
+
+    return null;
+  }
+
+  private class TlfuCacheEntry implements CacheEntry {
+
+    private final String cacheKey;
+    private final Block block;
+
+    TlfuCacheEntry(String k, Block b) {
+      this.cacheKey = k;
+      this.block = b;
+    }
+
     @Override
-    public Object getIndex() {
-      return index;
+    public byte[] getBuffer() {
+      return block.getBuffer();
     }
 
     @Override
-    public void setIndex(Object index) {
-      this.index = index;
+    public <T extends Weighbable> T getIndex(Supplier<T> supplier) {
+      return block.getIndex(supplier);
     }
 
-    int weight() {
-      return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY;
+    @Override
+    public void indexWeightChanged() {
+      if (block.indexWeightChanged()) {
+        // update weight
+        cache.put(cacheKey, block);
+      }
+    }
+  }
+
+  private Block load(String key, Loader loader, Map<String,byte[]> resolvedDeps) {
+    byte[] data = loader.load((int) Math.min(Integer.MAX_VALUE, policy.getMaximum()), resolvedDeps);
+    if (data == null) {
+      return null;
     }
+
+    return new Block(data);
+  }
+
+  private Map<String,byte[]> resolveDependencies(Map<String,Loader> deps) {
+    if (deps.size() == 1) {
+      Entry<String,Loader> entry = deps.entrySet().iterator().next();
+      CacheEntry ce = getBlock(entry.getKey(), entry.getValue());
+      if (ce == null) {
+        return null;
+      }
+      return Collections.singletonMap(entry.getKey(), ce.getBuffer());
+    } else {
+      HashMap<String,byte[]> resolvedDeps = new HashMap<>();
+      for (Entry<String,Loader> entry : deps.entrySet()) {
+        CacheEntry ce = getBlock(entry.getKey(), entry.getValue());
+        if (ce == null) {
+          return null;
+        }
+        resolvedDeps.put(entry.getKey(), ce.getBuffer());
+      }
+      return resolvedDeps;
+    }
+  }
+
+  @Override
+  public CacheEntry getBlock(String blockName, Loader loader) {
+    Map<String,Loader> deps = loader.getDependencies();
+    Block block;
+    if (deps.size() == 0) {
+      block = cache.get(blockName, k -> load(blockName, loader, Collections.emptyMap()));
+    } else {
+      // This code path exist to handle the case where dependencies may need to be loaded. Loading dependencies will access the cache. Cache load functions
+      // should not access the cache.
+      block = cache.getIfPresent(blockName);
+
+      if (block == null) {
+        // Load dependencies outside of cache load function.
+        Map<String,byte[]> resolvedDeps = resolveDependencies(deps);
+        if (resolvedDeps == null) {
+          return null;
+        }
+
+        // Use asMap because it will not increment stats, getIfPresent recorded a miss above. Use computeIfAbsent because it is possible another thread loaded
+        // the data since this thread called getIfPresent.
+        block = cache.asMap().computeIfAbsent(blockName, k -> load(blockName, loader, resolvedDeps));
+      }
+    }
+
+    return wrap(blockName, block);
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 3ecb5ca..d5ab62c 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -16,13 +16,16 @@
  */
 package org.apache.accumulo.core.file.blockfile.impl;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.lang.ref.SoftReference;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.file.blockfile.ABlockReader;
@@ -30,7 +33,9 @@ import org.apache.accumulo.core.file.blockfile.ABlockWriter;
 import org.apache.accumulo.core.file.blockfile.BlockFileReader;
 import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache.Loader;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
@@ -45,8 +50,9 @@ import org.apache.hadoop.fs.Seekable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
- *
  * This is a wrapper class for BCFile that includes a cache for independent caches for datablocks and metadatablocks
  */
 
@@ -138,267 +144,279 @@ public class CachableBlockFile {
 
   }
 
+  private static interface IoeSupplier<T> {
+    T get() throws IOException;
+  }
+
   /**
-   *
-   *
    * Class wraps the BCFile reader.
-   *
    */
   public static class Reader implements BlockFileReader {
     private final RateLimiter readLimiter;
-    private BCFile.Reader _bc;
-    private final String fileName;
-    private BlockCache _dCache = null;
-    private BlockCache _iCache = null;
-    private InputStream fin = null;
-    private FileSystem fs;
-    private Configuration conf;
+    // private BCFile.Reader _bc;
+    private final String cacheId;
+    private final BlockCache _dCache;
+    private final BlockCache _iCache;
+    private volatile InputStream fin = null;
     private boolean closed = false;
-    private AccumuloConfiguration accumuloConfiguration = null;
+    private final Configuration conf;
+    private final AccumuloConfiguration accumuloConfiguration;
+
+    private final IoeSupplier<InputStream> inputSupplier;
+    private final IoeSupplier<Long> lengthSupplier;
+    private final AtomicReference<BCFile.Reader> bcfr = new AtomicReference<>();
+
+    private static final String ROOT_BLOCK_NAME = "!RootData";
 
     // ACCUMULO-4716 - Define MAX_ARRAY_SIZE smaller than Integer.MAX_VALUE to prevent possible OutOfMemory
     // errors when allocating arrays - described in stackoverflow post: https://stackoverflow.com/a/8381338
     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 
-    private interface BlockLoader {
-      BlockReader get() throws IOException;
+    private BCFile.Reader getBCFile(byte[] serializedMetadata) throws IOException {
 
-      String getInfo();
-    }
+      BCFile.Reader reader = bcfr.get();
+      if (reader == null) {
+        RateLimitedInputStream fsIn = new RateLimitedInputStream((InputStream & Seekable) inputSupplier.get(), readLimiter);
+        BCFile.Reader tmpReader;
+        if (serializedMetadata == null) {
+          tmpReader = new BCFile.Reader(fsIn, lengthSupplier.get(), conf, accumuloConfiguration);
+        } else {
+          tmpReader = new BCFile.Reader(serializedMetadata, fsIn, conf, accumuloConfiguration);
+        }
 
-    private class OffsetBlockLoader implements BlockLoader {
+        if (!bcfr.compareAndSet(null, tmpReader)) {
+          fsIn.close();
+          tmpReader.close();
+          return bcfr.get();
+        } else {
+          fin = fsIn;
+          return tmpReader;
+        }
+      }
 
-      private int blockIndex;
+      return reader;
+    }
 
-      OffsetBlockLoader(int blockIndex) {
-        this.blockIndex = blockIndex;
+    private BCFile.Reader getBCFile() throws IOException {
+      if (_iCache != null) {
+        CacheEntry mce = _iCache.getBlock(cacheId + ROOT_BLOCK_NAME, new BCFileLoader());
+        if (mce != null) {
+          return getBCFile(mce.getBuffer());
+        }
       }
 
+      return getBCFile(null);
+    }
+
+    private class BCFileLoader implements Loader {
+
       @Override
-      public BlockReader get() throws IOException {
-        return getBCFile(accumuloConfiguration).getDataBlock(blockIndex);
+      public Map<String,Loader> getDependencies() {
+        return Collections.emptyMap();
       }
 
       @Override
-      public String getInfo() {
-        return "" + blockIndex;
+      public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
+        try {
+          return getBCFile(null).serializeMetadata(maxSize);
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
       }
-
     }
 
-    private class RawBlockLoader implements BlockLoader {
-
+    private class RawBlockLoader extends BaseBlockLoader {
       private long offset;
       private long compressedSize;
       private long rawSize;
 
-      RawBlockLoader(long offset, long compressedSize, long rawSize) {
+      private RawBlockLoader(long offset, long compressedSize, long rawSize, boolean loadingMeta) {
+        super(loadingMeta);
         this.offset = offset;
         this.compressedSize = compressedSize;
         this.rawSize = rawSize;
       }
 
       @Override
-      public BlockReader get() throws IOException {
-        return getBCFile(accumuloConfiguration).getDataBlock(offset, compressedSize, rawSize);
+      BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException {
+        if (rawSize > Math.min(maxSize, MAX_ARRAY_SIZE)) {
+          return null;
+        }
+        return bcfr.getDataBlock(offset, compressedSize, rawSize);
       }
 
       @Override
-      public String getInfo() {
-        return "" + offset + "," + compressedSize + "," + rawSize;
+      String getBlockId() {
+        return "raw-(" + offset + "," + compressedSize + "," + rawSize + ")";
       }
     }
 
-    private class MetaBlockLoader implements BlockLoader {
-
-      private String name;
-      private AccumuloConfiguration accumuloConfiguration;
+    private class OffsetBlockLoader extends BaseBlockLoader {
+      private int blockIndex;
 
-      MetaBlockLoader(String name, AccumuloConfiguration accumuloConfiguration) {
-        this.name = name;
-        this.accumuloConfiguration = accumuloConfiguration;
+      private OffsetBlockLoader(int blockIndex, boolean loadingMeta) {
+        super(loadingMeta);
+        this.blockIndex = blockIndex;
       }
 
       @Override
-      public BlockReader get() throws IOException {
-        return getBCFile(accumuloConfiguration).getMetaBlock(name);
+      BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException {
+        if (bcfr.getDataBlockRawSize(blockIndex) > Math.min(maxSize, MAX_ARRAY_SIZE)) {
+          return null;
+        }
+        return bcfr.getDataBlock(blockIndex);
       }
 
       @Override
-      public String getInfo() {
-        return name;
+      String getBlockId() {
+        return "bi-" + blockIndex;
       }
     }
 
-    public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
-      this(fs, dataFile, conf, data, index, null, accumuloConfiguration);
-    }
-
-    public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, RateLimiter readLimiter,
-        AccumuloConfiguration accumuloConfiguration) throws IOException {
-
-      /*
-       * Grab path create input stream grab len create file
-       */
-
-      fileName = dataFile.toString();
-      this._dCache = data;
-      this._iCache = index;
-      this.fs = fs;
-      this.conf = conf;
-      this.accumuloConfiguration = accumuloConfiguration;
-      this.readLimiter = readLimiter;
-    }
-
-    public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf, BlockCache data,
-        BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException {
-      this.fileName = cacheId;
-      this._dCache = data;
-      this._iCache = index;
-      this.readLimiter = null;
-      init(fsin, len, conf, accumuloConfiguration);
-    }
+    private class MetaBlockLoader extends BaseBlockLoader {
+      String blockName;
 
-    public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf,
-        AccumuloConfiguration accumuloConfiguration) throws IOException {
-      this.fileName = cacheId;
-      this.readLimiter = null;
-      init(fsin, len, conf, accumuloConfiguration);
-    }
-
-    private <InputStreamT extends InputStream & Seekable> void init(InputStreamT fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
-      this._bc = new BCFile.Reader(this, fsin, len, conf, accumuloConfiguration);
-    }
-
-    private synchronized BCFile.Reader getBCFile(AccumuloConfiguration accumuloConfiguration) throws IOException {
-      if (closed)
-        throw new IllegalStateException("File " + fileName + " is closed");
-
-      if (_bc == null) {
-        // lazily open file if needed
-        Path path = new Path(fileName);
-        RateLimitedInputStream fsIn = new RateLimitedInputStream(fs.open(path), this.readLimiter);
-        fin = fsIn;
-        init(fsIn, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
+      MetaBlockLoader(String blockName) {
+        super(true);
+        this.blockName = blockName;
       }
 
-      return _bc;
-    }
-
-    public BlockRead getCachedMetaBlock(String blockName) throws IOException {
-      String _lookup = fileName + "M" + blockName;
-
-      if (_iCache != null) {
-        CacheEntry cacheEntry = _iCache.getBlock(_lookup);
-
-        if (cacheEntry != null) {
-          return new CachedBlockRead(cacheEntry, cacheEntry.getBuffer());
+      @Override
+      BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException {
+        if (bcfr.getMetaBlockRawSize(blockName) > Math.min(maxSize, MAX_ARRAY_SIZE)) {
+          return null;
         }
-
+        return bcfr.getMetaBlock(blockName);
       }
 
-      return null;
-    }
-
-    public BlockRead cacheMetaBlock(String blockName, BlockReader _currBlock) throws IOException {
-      String _lookup = fileName + "M" + blockName;
-      return cacheBlock(_lookup, _iCache, _currBlock, blockName);
+      @Override
+      String getBlockId() {
+        return "meta-" + blockName;
+      }
     }
 
-    public void cacheMetaBlock(String blockName, byte[] b) {
+    private abstract class BaseBlockLoader implements Loader {
 
-      if (_iCache == null)
-        return;
+      abstract BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException;
 
-      String _lookup = fileName + "M" + blockName;
-      try {
-        _iCache.cacheBlock(_lookup, b);
-      } catch (Exception e) {
-        log.warn("Already cached block: " + _lookup, e);
-      }
-    }
+      abstract String getBlockId();
 
-    private BlockRead getBlock(String _lookup, BlockCache cache, BlockLoader loader) throws IOException {
+      private boolean loadingMetaBlock;
 
-      BlockReader _currBlock;
-
-      if (cache != null) {
-        CacheEntry cb = null;
-        cb = cache.getBlock(_lookup);
+      public BaseBlockLoader(boolean loadingMetaBlock) {
+        super();
+        this.loadingMetaBlock = loadingMetaBlock;
+      }
 
-        if (cb != null) {
-          return new CachedBlockRead(cb, cb.getBuffer());
+      @Override
+      public Map<String,Loader> getDependencies() {
+        if (bcfr.get() == null && loadingMetaBlock) {
+          String _lookup = cacheId + ROOT_BLOCK_NAME;
+          return Collections.singletonMap(_lookup, new BCFileLoader());
         }
-
+        return Collections.emptyMap();
       }
-      /**
-       * grab the currBlock at this point the block is still in the data stream
-       *
-       */
-      _currBlock = loader.get();
 
-      /**
-       * If the block is bigger than the cache just return the stream
-       */
-      return cacheBlock(_lookup, cache, _currBlock, loader.getInfo());
+      @Override
+      public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
 
-    }
+        try {
+          BCFile.Reader reader = bcfr.get();
+          if (reader == null) {
+            if (loadingMetaBlock) {
+              byte[] serializedMetadata = dependencies.get(cacheId + ROOT_BLOCK_NAME);
+              reader = getBCFile(serializedMetadata);
+            } else {
+              reader = getBCFile();
+            }
+          }
 
-    private BlockRead cacheBlock(String _lookup, BlockCache cache, BlockReader _currBlock, String block) throws IOException {
+          BlockReader _currBlock = getBlockReader(maxSize, reader);
+          if (_currBlock == null) {
+            return null;
+          }
 
-      if ((cache == null) || (_currBlock.getRawSize() > Math.min(cache.getMaxSize(), MAX_ARRAY_SIZE))) {
-        return new BlockRead(_currBlock, _currBlock.getRawSize());
-      } else {
+          byte b[] = null;
+          try {
+            b = new byte[(int) _currBlock.getRawSize()];
+            _currBlock.readFully(b);
+          } catch (IOException e) {
+            log.debug("Error full blockRead for file " + cacheId + " for block " + getBlockId(), e);
+            throw new UncheckedIOException(e);
+          } finally {
+            _currBlock.close();
+          }
 
-        /**
-         * Try to fully read block for meta data if error try to close file
-         *
-         */
-        byte b[] = null;
-        try {
-          b = new byte[(int) _currBlock.getRawSize()];
-          _currBlock.readFully(b);
+          return b;
         } catch (IOException e) {
-          log.debug("Error full blockRead for file " + fileName + " for block " + block, e);
-          throw e;
-        } finally {
-          _currBlock.close();
+          throw new UncheckedIOException(e);
         }
+      }
+    }
 
-        CacheEntry ce = null;
-        try {
-          ce = cache.cacheBlock(_lookup, b);
-        } catch (Exception e) {
-          log.warn("Already cached block: " + _lookup, e);
-        }
+    private Reader(String cacheId, IoeSupplier<InputStream> inputSupplier, IoeSupplier<Long> lenghtSupplier, BlockCache data, BlockCache index,
+        RateLimiter readLimiter, Configuration conf, AccumuloConfiguration accumuloConfiguration) {
+      Preconditions.checkArgument(cacheId != null || (data == null && index == null));
+      this.cacheId = cacheId;
+      this.inputSupplier = inputSupplier;
+      this.lengthSupplier = lenghtSupplier;
+      this._dCache = data;
+      this._iCache = index;
+      this.readLimiter = readLimiter;
+      this.conf = conf;
+      this.accumuloConfiguration = accumuloConfiguration;
+    }
 
-        if (ce == null)
-          return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
-        else
-          return new CachedBlockRead(ce, ce.getBuffer());
+    public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration)
+        throws IOException {
+      this(fs, dataFile, conf, data, index, null, accumuloConfiguration);
+    }
 
-      }
+    public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, RateLimiter readLimiter,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this(dataFile.toString(), () -> fs.open(dataFile), () -> fs.getFileStatus(dataFile).getLen(), data, index, readLimiter, conf, accumuloConfiguration);
+    }
+
+    public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf, BlockCache data,
+        BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this(cacheId, () -> fsin, () -> len, data, index, null, conf, accumuloConfiguration);
+    }
+
+    public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this(null, () -> fsin, () -> len, null, null, null, conf, accumuloConfiguration);
     }
 
     /**
      * It is intended that once the BlockRead object is returned to the caller, that the caller will read the entire block and then call close on the BlockRead
      * class.
-     *
-     * NOTE: In the case of multi-read threads: This method can do redundant work where an entry is read from disk and other threads check the cache before it
-     * has been inserted.
      */
     @Override
     public BlockRead getMetaBlock(String blockName) throws IOException {
-      String _lookup = this.fileName + "M" + blockName;
-      return getBlock(_lookup, _iCache, new MetaBlockLoader(blockName, accumuloConfiguration));
+      if (_iCache != null) {
+        String _lookup = this.cacheId + "M" + blockName;
+        CacheEntry ce = _iCache.getBlock(_lookup, new MetaBlockLoader(blockName));
+        if (ce != null) {
+          return new CachedBlockRead(ce, ce.getBuffer());
+        }
+      }
+
+      BlockReader _currBlock = getBCFile(null).getMetaBlock(blockName);
+      return new BlockRead(_currBlock, _currBlock.getRawSize());
     }
 
     @Override
     public ABlockReader getMetaBlock(long offset, long compressedSize, long rawSize) throws IOException {
-      String _lookup = this.fileName + "R" + offset;
-      return getBlock(_lookup, _iCache, new RawBlockLoader(offset, compressedSize, rawSize));
+      if (_iCache != null) {
+        String _lookup = this.cacheId + "R" + offset;
+        CacheEntry ce = _iCache.getBlock(_lookup, new RawBlockLoader(offset, compressedSize, rawSize, true));
+        if (ce != null) {
+          return new CachedBlockRead(ce, ce.getBuffer());
+        }
+      }
+
+      BlockReader _currBlock = getBCFile(null).getDataBlock(offset, compressedSize, rawSize);
+      return new BlockRead(_currBlock, _currBlock.getRawSize());
     }
 
     /**
@@ -411,15 +429,30 @@ public class CachableBlockFile {
 
     @Override
     public BlockRead getDataBlock(int blockIndex) throws IOException {
-      String _lookup = this.fileName + "O" + blockIndex;
-      return getBlock(_lookup, _dCache, new OffsetBlockLoader(blockIndex));
+      if (_dCache != null) {
+        String _lookup = this.cacheId + "O" + blockIndex;
+        CacheEntry ce = _dCache.getBlock(_lookup, new OffsetBlockLoader(blockIndex, false));
+        if (ce != null) {
+          return new CachedBlockRead(ce, ce.getBuffer());
+        }
+      }
 
+      BlockReader _currBlock = getBCFile().getDataBlock(blockIndex);
+      return new BlockRead(_currBlock, _currBlock.getRawSize());
     }
 
     @Override
     public ABlockReader getDataBlock(long offset, long compressedSize, long rawSize) throws IOException {
-      String _lookup = this.fileName + "R" + offset;
-      return getBlock(_lookup, _dCache, new RawBlockLoader(offset, compressedSize, rawSize));
+      if (_dCache != null) {
+        String _lookup = this.cacheId + "R" + offset;
+        CacheEntry ce = _dCache.getBlock(_lookup, new RawBlockLoader(offset, compressedSize, rawSize, false));
+        if (ce != null) {
+          return new CachedBlockRead(ce, ce.getBuffer());
+        }
+      }
+
+      BlockReader _currBlock = getBCFile().getDataBlock(offset, compressedSize, rawSize);
+      return new BlockRead(_currBlock, _currBlock.getRawSize());
     }
 
     @Override
@@ -429,8 +462,9 @@ public class CachableBlockFile {
 
       closed = true;
 
-      if (_bc != null)
-        _bc.close();
+      BCFile.Reader reader = bcfr.get();
+      if (reader != null)
+        reader.close();
 
       if (fin != null) {
         // synchronize on the FSDataInputStream to ensure thread safety with the BoundedRangeFileInputStream
@@ -477,25 +511,13 @@ public class CachableBlockFile {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public <T> T getIndex(Class<T> clazz) {
-      T bi = null;
-      synchronized (cb) {
-        SoftReference<T> softRef = (SoftReference<T>) cb.getIndex();
-        if (softRef != null)
-          bi = softRef.get();
-
-        if (bi == null) {
-          try {
-            bi = clazz.newInstance();
-          } catch (Exception e) {
-            throw new RuntimeException(e);
-          }
-          cb.setIndex(new SoftReference<>(bi));
-        }
-      }
+    public <T extends Weighbable> T getIndex(Supplier<T> indexSupplier) {
+      return cb.getIndex(indexSupplier);
+    }
 
-      return bi;
+    @Override
+    public void indexWeightChanged() {
+      cb.indexWeightChanged();
     }
   }
 
@@ -537,7 +559,7 @@ public class CachableBlockFile {
     }
 
     @Override
-    public <T> T getIndex(Class<T> clazz) {
+    public <T extends Weighbable> T getIndex(Supplier<T> clazz) {
       throw new UnsupportedOperationException();
     }
 
@@ -549,5 +571,10 @@ public class CachableBlockFile {
       throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void indexWeightChanged() {
+      throw new UnsupportedOperationException();
+    }
+
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
index 652515e..3401396 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
@@ -24,22 +24,28 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.blockfile.ABlockReader;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
+import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize;
+import org.apache.accumulo.core.file.blockfile.cache.impl.SizeConstants;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 
 /**
  *
  */
-public class BlockIndex {
+public class BlockIndex implements Weighbable {
 
   public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
 
-    BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class);
+    BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex::new);
+    if (blockIndex == null)
+      return null;
 
     int accessCount = blockIndex.accessCount.incrementAndGet();
 
     // 1 is a power of two, but do not care about it
     if (accessCount >= 2 && isPowerOfTwo(accessCount)) {
       blockIndex.buildIndex(accessCount, cacheBlock, indexEntry);
+      cacheBlock.indexWeightChanged();
     }
 
     if (blockIndex.blockIndex != null)
@@ -98,8 +104,12 @@ public class BlockIndex {
 
     @Override
     public int hashCode() {
-      assert false : "hashCode not designed";
-      return 42; // any arbitrary constant will do
+      throw new UnsupportedOperationException("hashCode not designed");
+    }
+
+    int weight() {
+      int keyWeight = ClassSize.align(prevKey.getSize()) + ClassSize.OBJECT + SizeConstants.SIZEOF_LONG + 4 * (ClassSize.ARRAY + ClassSize.REFERENCE);
+      return 2 * SizeConstants.SIZEOF_INT + ClassSize.REFERENCE + ClassSize.OBJECT + keyWeight;
     }
   }
 
@@ -186,4 +196,17 @@ public class BlockIndex {
   BlockIndexEntry[] getIndexEntries() {
     return blockIndex;
   }
+
+  @Override
+  public synchronized int weight() {
+    int weight = 0;
+    if (blockIndex != null) {
+      for (BlockIndexEntry blockIndexEntry : blockIndex) {
+        weight += blockIndexEntry.weight();
+      }
+    }
+
+    weight += ClassSize.ATOMIC_INTEGER + ClassSize.OBJECT + 2 * ClassSize.REFERENCE + ClassSize.ARRAY;
+    return weight;
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index e745583..e998a87 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -17,6 +17,7 @@
 
 package org.apache.accumulo.core.file.rfile.bcfile;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.DataInput;
@@ -26,6 +27,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,8 +38,6 @@ import java.util.TreeMap;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
-import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.BlockRead;
 import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.Scalar;
 import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
@@ -595,8 +595,6 @@ public final class BCFile {
    * BCFile Reader, interface to read the file's data and meta blocks.
    */
   static public class Reader implements Closeable {
-    private static final String META_NAME = "BCFile.metaindex";
-    private static final String CRYPTO_BLOCK_NAME = "BCFile.cryptoparams";
     private final SeekableDataInputStream in;
     private final Configuration conf;
     final DataIndex dataIndex;
@@ -759,6 +757,37 @@ public final class BCFile {
       }
     }
 
+    public byte[] serializeMetadata(int maxSize) {
+      try {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(baos);
+
+        metaIndex.write(out);
+        if (out.size() > maxSize) {
+          return null;
+        }
+        dataIndex.write(out);
+        if (out.size() > maxSize) {
+          return null;
+        }
+        if (cryptoParams == null) {
+          out.writeBoolean(false);
+        } else {
+          out.writeBoolean(true);
+          cryptoParams.write(out);
+        }
+
+        if (out.size() > maxSize) {
+          return null;
+        }
+
+        out.close();
+        return baos.toByteArray();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
     /**
      * Constructor
      *
@@ -847,131 +876,24 @@ public final class BCFile {
       }
     }
 
-    public <InputStreamType extends InputStream & Seekable> Reader(CachableBlockFile.Reader cache, InputStreamType fin, long fileLength, Configuration conf,
+    public <InputStreamType extends InputStream & Seekable> Reader(byte[] serializedMetadata, InputStreamType fin, Configuration conf,
         AccumuloConfiguration accumuloConfiguration) throws IOException {
       this.in = new SeekableDataInputStream(fin);
       this.conf = conf;
 
-      BlockRead cachedMetaIndex = cache.getCachedMetaBlock(META_NAME);
-      BlockRead cachedDataIndex = cache.getCachedMetaBlock(DataIndex.BLOCK_NAME);
-      BlockRead cachedCryptoParams = cache.getCachedMetaBlock(CRYPTO_BLOCK_NAME);
-
-      if (cachedMetaIndex == null || cachedDataIndex == null || cachedCryptoParams == null) {
-        // move the cursor to the beginning of the tail, containing: offset to the
-        // meta block index, version and magic
-        // Move the cursor to grab the version and the magic first
-        this.in.seek(fileLength - Magic.size() - Version.size());
-        version = new Version(this.in);
-        Magic.readAndVerify(this.in);
-
-        // Do a version check
-        if (!version.compatibleWith(BCFile.API_VERSION) && !version.equals(BCFile.API_VERSION_1)) {
-          throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
-        }
-
-        // Read the right number offsets based on version
-        long offsetIndexMeta = 0;
-        long offsetCryptoParameters = 0;
-
-        if (version.equals(API_VERSION_1)) {
-          this.in.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
-          offsetIndexMeta = this.in.readLong();
-
-        } else {
-          this.in.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
-          offsetIndexMeta = this.in.readLong();
-          offsetCryptoParameters = this.in.readLong();
-        }
-
-        // read meta index
-        this.in.seek(offsetIndexMeta);
-        metaIndex = new MetaIndex(this.in);
-
-        // If they exist, read the crypto parameters
-        if (!version.equals(BCFile.API_VERSION_1) && cachedCryptoParams == null) {
-
-          // read crypto parameters
-          this.in.seek(offsetCryptoParameters);
-          cryptoParams = new BCFileCryptoModuleParameters();
-          cryptoParams.read(this.in);
-
-          if (accumuloConfiguration.getBoolean(Property.CRYPTO_OVERRIDE_KEY_STRATEGY_WITH_CONFIGURED_STRATEGY)) {
-            Map<String,String> cryptoConfFromAccumuloConf = accumuloConfiguration.getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);
-            Map<String,String> instanceConf = accumuloConfiguration.getAllPropertiesWithPrefix(Property.INSTANCE_PREFIX);
-
-            cryptoConfFromAccumuloConf.putAll(instanceConf);
-
-            for (String name : cryptoParams.getAllOptions().keySet()) {
-              if (!name.equals(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey())) {
-                cryptoConfFromAccumuloConf.put(name, cryptoParams.getAllOptions().get(name));
-              } else {
-                cryptoParams.setKeyEncryptionStrategyClass(cryptoConfFromAccumuloConf.get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey()));
-              }
-            }
-
-            cryptoParams.setAllOptions(cryptoConfFromAccumuloConf);
-          }
-
-          ByteArrayOutputStream baos = new ByteArrayOutputStream();
-          DataOutputStream dos = new DataOutputStream(baos);
-          cryptoParams.write(dos);
-          dos.close();
-          cache.cacheMetaBlock(CRYPTO_BLOCK_NAME, baos.toByteArray());
-
-          this.cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoParams.getAllOptions().get(Property.CRYPTO_MODULE_CLASS.getKey()));
-          this.secretKeyEncryptionStrategy = CryptoModuleFactory.getSecretKeyEncryptionStrategy(cryptoParams.getKeyEncryptionStrategyClass());
+      ByteArrayInputStream bais = new ByteArrayInputStream(serializedMetadata);
+      DataInputStream dis = new DataInputStream(bais);
 
-          // This call should put the decrypted session key within the cryptoParameters object
-          // secretKeyEncryptionStrategy.decryptSecretKey(cryptoParameters);
+      version = null;
 
-          cryptoParams = (BCFileCryptoModuleParameters) secretKeyEncryptionStrategy.decryptSecretKey(cryptoParams);
-
-        } else if (cachedCryptoParams != null) {
-          setupCryptoFromCachedData(cachedCryptoParams);
-        } else {
-          // Place something in cache that indicates this file has no crypto metadata. See ACCUMULO-4141
-          ByteArrayOutputStream baos = new ByteArrayOutputStream();
-          DataOutputStream dos = new DataOutputStream(baos);
-          NO_CRYPTO.write(dos);
-          dos.close();
-          cache.cacheMetaBlock(CRYPTO_BLOCK_NAME, baos.toByteArray());
-        }
-
-        if (cachedMetaIndex == null) {
-          ByteArrayOutputStream baos = new ByteArrayOutputStream();
-          DataOutputStream dos = new DataOutputStream(baos);
-          metaIndex.write(dos);
-          dos.close();
-          cache.cacheMetaBlock(META_NAME, baos.toByteArray());
-        }
-
-        // read data:BCFile.index, the data block index
-        if (cachedDataIndex == null) {
-          BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
-          cachedDataIndex = cache.cacheMetaBlock(DataIndex.BLOCK_NAME, blockR);
-        }
-
-        try {
-          dataIndex = new DataIndex(cachedDataIndex);
-        } catch (IOException e) {
-          LOG.error("Got IOException when trying to create DataIndex block");
-          throw e;
-        } finally {
-          cachedDataIndex.close();
-        }
-
-      } else {
-        // We have cached versions of the metaIndex, dataIndex and cryptoParams objects.
-        // Use them to fill out this reader's members.
-        version = null;
-
-        metaIndex = new MetaIndex(cachedMetaIndex);
-        dataIndex = new DataIndex(cachedDataIndex);
-        setupCryptoFromCachedData(cachedCryptoParams);
+      metaIndex = new MetaIndex(dis);
+      dataIndex = new DataIndex(dis);
+      if (dis.readBoolean()) {
+        setupCryptoFromCachedData(dis);
       }
     }
 
-    private void setupCryptoFromCachedData(BlockRead cachedCryptoParams) throws IOException {
+    private void setupCryptoFromCachedData(DataInput cachedCryptoParams) throws IOException {
       BCFileCryptoModuleParameters params = new BCFileCryptoModuleParameters();
       params.read(cachedCryptoParams);
 
@@ -1052,6 +974,15 @@ public final class BCFile {
       return createReader(imeBCIndex.getCompressionAlgorithm(), region);
     }
 
+    public long getMetaBlockRawSize(String name) throws IOException, MetaBlockDoesNotExist {
+      MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
+      if (imeBCIndex == null) {
+        throw new MetaBlockDoesNotExist("name=" + name);
+      }
+
+      return imeBCIndex.getRegion().getRawSize();
+    }
+
     /**
      * Stream access to a Data Block.
      *
@@ -1073,11 +1004,18 @@ public final class BCFile {
       return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
     }
 
+    public long getDataBlockRawSize(int blockIndex) {
+      if (blockIndex < 0 || blockIndex >= getBlockCount()) {
+        throw new IndexOutOfBoundsException(String.format("blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
+      }
+
+      return dataIndex.getBlockRegionList().get(blockIndex).getRawSize();
+    }
+
     private BlockReader createReader(Algorithm compressAlgo, BlockRegion region) throws IOException {
       RBlockState rbs = new RBlockState(compressAlgo, in, region, conf, cryptoModule, version, cryptoParams);
       return new BlockReader(rbs);
     }
-
   }
 
   /**
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index fa92fe0..e633f13 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -66,11 +66,6 @@ public class SummaryReader {
     }
 
     @Override
-    public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
-      return summaryCache.cacheBlock(blockName, buf, inMemory);
-    }
-
-    @Override
     public CacheEntry getBlock(String blockName) {
       CacheEntry ce = summaryCache.getBlock(blockName);
       if (ce == null) {
@@ -81,6 +76,34 @@ public class SummaryReader {
     }
 
     @Override
+    public CacheEntry getBlock(String blockName, Loader loader) {
+      Loader idxLoader = new Loader() {
+
+        CacheEntry idxCacheEntry;
+
+        @Override
+        public Map<String,Loader> getDependencies() {
+          idxCacheEntry = indexCache.getBlock(blockName);
+          if (idxCacheEntry == null) {
+            return loader.getDependencies();
+          } else {
+            return Collections.emptyMap();
+          }
+        }
+
+        @Override
+        public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
+          if (idxCacheEntry == null) {
+            return loader.load(maxSize, dependencies);
+          } else {
+            return idxCacheEntry.getBuffer();
+          }
+        }
+      };
+      return summaryCache.getBlock(blockName, idxLoader);
+    }
+
+    @Override
     public long getMaxSize() {
       return summaryCache.getMaxSize();
     }
@@ -144,9 +167,9 @@ public class SummaryReader {
     return fileSummaries;
   }
 
-  public static SummaryReader load(String id, Configuration conf, AccumuloConfiguration aConf, InputStream inputStream, long length,
+  public static SummaryReader load(Configuration conf, AccumuloConfiguration aConf, InputStream inputStream, long length,
       Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory) throws IOException {
-    org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader = new CachableBlockFile.Reader(id, (InputStream & Seekable) inputStream,
+    org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader = new CachableBlockFile.Reader((InputStream & Seekable) inputStream,
         length, conf, aConf);
     return load(bcReader, summarySelector, factory);
   }
diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
index 8b7f32b..4b4a886 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
@@ -51,8 +51,8 @@ public class TestLruBlockCache extends TestCase {
     cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(1000027));
     cc.set(Property.TSERV_SUMMARYCACHE_SIZE, Long.toString(1000029));
 
-    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.93f).acceptableFactor(0.97f).singleFactor(0.20f)
-        .multiFactor(0.30f).memoryFactor(0.50f).mapConcurrencyLevel(5).buildMap().forEach(cc::set);
+    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.93f).acceptableFactor(0.97f).singleFactor(0.20f).multiFactor(0.30f)
+        .memoryFactor(0.50f).mapConcurrencyLevel(5).buildMap().forEach(cc::set);
 
     String defaultPrefix = BlockCacheManager.CACHE_PROPERTY_BASE + LruBlockCacheConfiguration.PROPERTY_PREFIX + ".default.";
 
@@ -225,8 +225,8 @@ public class TestLruBlockCache extends TestCase {
     BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc);
     cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
     cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
-    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.25f)
-        .multiFactor(0.50f).memoryFactor(0.25f).buildMap().forEach(cc::set);
+    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.25f).multiFactor(0.50f)
+        .memoryFactor(0.25f).buildMap().forEach(cc::set);
     manager.start(new BlockCacheConfiguration(cc));
     LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
 
@@ -289,8 +289,8 @@ public class TestLruBlockCache extends TestCase {
     BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc);
     cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
     cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
-    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.33f)
-        .multiFactor(0.33f).memoryFactor(0.34f).buildMap().forEach(cc::set);
+    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.33f).multiFactor(0.33f)
+        .memoryFactor(0.34f).buildMap().forEach(cc::set);
     manager.start(new BlockCacheConfiguration(cc));
     LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
 
@@ -412,8 +412,8 @@ public class TestLruBlockCache extends TestCase {
     BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc);
     cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
     cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
-    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.66f).acceptableFactor(0.99f).singleFactor(0.33f)
-        .multiFactor(0.33f).memoryFactor(0.34f).buildMap().forEach(cc::set);
+    LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.66f).acceptableFactor(0.99f).singleFactor(0.33f).multiFactor(0.33f)
+        .memoryFactor(0.34f).buildMap().forEach(cc::set);
     manager.start(new BlockCacheConfiguration(cc));
     LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
 
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java
index 2985591..93c54e5 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.file.rfile;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
@@ -43,20 +44,22 @@ public class BlockIndexTest {
       this.data = d;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public void setIndex(Object idx) {
-      this.idx = idx;
-    }
-
-    @Override
-    public Object getIndex() {
-      return idx;
+    public <T extends Weighbable> T getIndex(Supplier<T> indexSupplier) {
+      if (idx == null) {
+        idx = indexSupplier.get();
+      }
+      return (T) idx;
     }
 
     @Override
     public byte[] getBuffer() {
       return data;
     }
+
+    @Override
+    public void indexWeightChanged() {}
   }
 
   @Test
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 36e7af4..1c61162 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -77,7 +77,7 @@ public class MultiLevelIndexTest extends TestCase {
     byte[] data = baos.toByteArray();
     SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
     FSDataInputStream in = new FSDataInputStream(bais);
-    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source1", in, data.length, CachedConfiguration.getInstance(), aconf);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance(), aconf);
 
     Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8);
     BlockRead rootIn = _cbr.getMetaBlock("root");
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 2581f91..0645880 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -287,7 +287,6 @@ public class RFileTest {
       LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA);
 
       CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in, fileLength, conf, dataCache, indexCache, DefaultConfiguration.getInstance());
-
       reader = new RFile.Reader(_cbr);
       if (cfsi)
         iter = new ColumnFamilySkippingIterator(reader);
@@ -1647,7 +1646,7 @@ public class RFileTest {
     SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
     FSDataInputStream in2 = new FSDataInputStream(bais);
     AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
-    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in2, data.length, CachedConfiguration.getInstance(), aconf);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in2, data.length, CachedConfiguration.getInstance(), aconf);
     Reader reader = new RFile.Reader(_cbr);
     checkIndex(reader);
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <co...@accumulo.apache.org>'].