You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2020/01/02 21:02:39 UTC

[accumulo] branch master updated: Give scans control over cache via scan dispatchers #1383 (#1440)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new cf59651  Give scans control over cache via scan dispatchers #1383 (#1440)
cf59651 is described below

commit cf59651efe8611c69315fb20a0aaa823f6b3f1f6
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 2 16:02:31 2020 -0500

    Give scans control over cache via scan dispatchers #1383 (#1440)
    
    This commit enables per scan control over cache usages.  This was done
    via extending the scope of ScanDispatchers to include cache control.
    
    The built in SimpleScanDispatcher was improved to support mapping
    scanner execution hints of the form scan_type=X to
    cache usage directives for a scan.
    
    Accumulo caches open files.  Before this change the cache was bound to
    the file when it was opened.  Now a cache can be bound to an already
    open file. The internal interface CacheProvider was created to
    facilitate this.
---
 .../accumulo/core/client/rfile/RFileScanner.java   |   9 +-
 .../accumulo/core/file/BloomFilterLayer.java       |   5 +
 .../apache/accumulo/core/file/FileOperations.java  |  72 ++++----------
 .../apache/accumulo/core/file/FileSKVIterator.java |   3 +
 .../impl/BasicCacheProvider.java}                  |  32 +++---
 .../file/blockfile/impl/CachableBlockFile.java     |  27 ++---
 .../impl/CacheProvider.java}                       |  24 ++---
 .../blockfile/impl/OpportunisticBlockCache.java    |  62 ++++++++++++
 .../file/blockfile/impl/ScanCacheProvider.java     |  80 +++++++++++++++
 .../accumulo/core/file/map/MapFileOperations.java  |   4 +
 .../core/file/rfile/MultiIndexIterator.java        |   5 +
 .../org/apache/accumulo/core/file/rfile/RFile.java |  11 +++
 .../accumulo/core/file/rfile/RFileOperations.java  |   4 +-
 .../core/iteratorsImpl/system/MapFileIterator.java |   4 +
 .../iteratorsImpl/system/SequenceFileIterator.java |   4 +
 .../core/spi/scan/DefaultScanDirectives.java       |  56 +++++++++++
 .../accumulo/core/spi/scan/ScanDirectives.java     | 109 +++++++++++++++++++++
 .../accumulo/core/spi/scan/ScanDirectivesImpl.java |  98 ++++++++++++++++++
 .../accumulo/core/spi/scan/ScanDispatcher.java     |  49 ++++++++-
 .../core/spi/scan/SimpleScanDispatcher.java        | 105 +++++++++++++++-----
 .../accumulo/core/summary/SummaryReader.java       |   3 +-
 .../apache/accumulo/core/file/rfile/RFileTest.java |   6 +-
 .../core/spi/scan/SimpleScanDispatcherTest.java    |  51 ++++++++--
 .../org/apache/accumulo/tserver/FileManager.java   |  35 +++----
 .../tserver/TabletServerResourceManager.java       |  36 +++++--
 .../accumulo/tserver/scan/ScanParameters.java      |  10 ++
 .../accumulo/tserver/tablet/ScanDataSource.java    |   3 +-
 .../compaction/DefaultCompactionStrategyTest.java  |   4 +
 28 files changed, 741 insertions(+), 170 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 0a760d9..72eecf5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -50,7 +50,9 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
 import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
+import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.core.iterators.IteratorAdapter;
@@ -339,12 +341,15 @@ class RFileScanner extends ScannerOptions implements Scanner {
     try {
       RFileSource[] sources = opts.in.getSources();
       List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length);
+
+      CacheProvider cacheProvider = new BasicCacheProvider(indexCache, dataCache);
+
       for (int i = 0; i < sources.length; i++) {
         // TODO may have been a bug with multiple files and caching in older version...
         FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
         CachableBuilder cb = new CachableBuilder().cacheId("source-" + i).input(inputStream)
-            .length(sources[i].getLength()).conf(opts.in.getConf()).data(dataCache)
-            .index(indexCache).cryptoService(cryptoService);
+            .length(sources[i].getLength()).conf(opts.in.getConf()).cacheProvider(cacheProvider)
+            .cryptoService(cryptoService);
         readers.add(new RFile.Reader(cb));
       }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index 18568e9..2313efe 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.file.keyfunctor.KeyFunctor;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
@@ -443,6 +444,10 @@ public class BloomFilterLayer {
       return new BloomFilterLayer.Reader(reader.getSample(sampleConfig), bfl);
     }
 
+    @Override
+    public void setCacheProvider(CacheProvider cacheProvider) {
+      reader.setCacheProvider(cacheProvider);
+    }
   }
 
   public static void main(String[] args) throws IOException {
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 3789e28..53ba829 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.core.file;
 
+import static org.apache.accumulo.core.file.blockfile.impl.CacheProvider.NULL_PROVIDER;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -29,8 +31,8 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.file.rfile.RFile;
-import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
@@ -175,8 +177,7 @@ public abstract class FileOperations {
     public final FSDataOutputStream outputStream;
     public final boolean enableAccumuloStart;
     // reader only objects
-    public final BlockCache dataCache;
-    public final BlockCache indexCache;
+    public final CacheProvider cacheProvider;
     public final Cache<String,Long> fileLenCache;
     public final boolean seekToBeginning;
     public final CryptoService cryptoService;
@@ -187,10 +188,9 @@ public abstract class FileOperations {
 
     public FileOptions(AccumuloConfiguration tableConfiguration, String filename, FileSystem fs,
         Configuration fsConf, RateLimiter rateLimiter, String compression,
-        FSDataOutputStream outputStream, boolean enableAccumuloStart, BlockCache dataCache,
-        BlockCache indexCache, Cache<String,Long> fileLenCache, boolean seekToBeginning,
-        CryptoService cryptoService, Range range, Set<ByteSequence> columnFamilies,
-        boolean inclusive) {
+        FSDataOutputStream outputStream, boolean enableAccumuloStart, CacheProvider cacheProvider,
+        Cache<String,Long> fileLenCache, boolean seekToBeginning, CryptoService cryptoService,
+        Range range, Set<ByteSequence> columnFamilies, boolean inclusive) {
       this.tableConfiguration = tableConfiguration;
       this.filename = filename;
       this.fs = fs;
@@ -199,8 +199,7 @@ public abstract class FileOperations {
       this.compression = compression;
       this.outputStream = outputStream;
       this.enableAccumuloStart = enableAccumuloStart;
-      this.dataCache = dataCache;
-      this.indexCache = indexCache;
+      this.cacheProvider = cacheProvider;
       this.fileLenCache = fileLenCache;
       this.seekToBeginning = seekToBeginning;
       this.cryptoService = Objects.requireNonNull(cryptoService);
@@ -241,12 +240,8 @@ public abstract class FileOperations {
       return enableAccumuloStart;
     }
 
-    public BlockCache getDataCache() {
-      return dataCache;
-    }
-
-    public BlockCache getIndexCache() {
-      return indexCache;
+    public CacheProvider getCacheProvider() {
+      return cacheProvider;
     }
 
     public Cache<String,Long> getFileLenCache() {
@@ -318,25 +313,25 @@ public abstract class FileOperations {
     protected FileOptions toWriterBuilderOptions(String compression,
         FSDataOutputStream outputStream, boolean startEnabled) {
       return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, compression,
-          outputStream, startEnabled, null, null, null, false, cryptoService, null, null, true);
+          outputStream, startEnabled, NULL_PROVIDER, null, false, cryptoService, null, null, true);
     }
 
-    protected FileOptions toReaderBuilderOptions(BlockCache dataCache, BlockCache indexCache,
+    protected FileOptions toReaderBuilderOptions(CacheProvider cacheProvider,
         Cache<String,Long> fileLenCache, boolean seekToBeginning) {
       return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
-          false, dataCache, indexCache, fileLenCache, seekToBeginning, cryptoService, null, null,
-          true);
+          false, cacheProvider == null ? NULL_PROVIDER : cacheProvider, fileLenCache,
+          seekToBeginning, cryptoService, null, null, true);
     }
 
     protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> fileLenCache) {
       return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
-          false, null, null, fileLenCache, false, cryptoService, null, null, true);
+          false, NULL_PROVIDER, fileLenCache, false, cryptoService, null, null, true);
     }
 
     protected FileOptions toScanReaderBuilderOptions(Range range, Set<ByteSequence> columnFamilies,
         boolean inclusive) {
       return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
-          false, null, null, null, false, cryptoService, range, columnFamilies, inclusive);
+          false, NULL_PROVIDER, null, false, cryptoService, range, columnFamilies, inclusive);
     }
 
     protected AccumuloConfiguration getTableConfiguration() {
@@ -399,8 +394,7 @@ public abstract class FileOperations {
    * Options common to all {@code FileOperations} which perform reads.
    */
   public class ReaderBuilder extends FileHelper implements ReaderTableConfiguration {
-    private BlockCache dataCache;
-    private BlockCache indexCache;
+    private CacheProvider cacheProvider;
     private Cache<String,Long> fileLenCache;
     private boolean seekToBeginning = false;
 
@@ -420,23 +414,8 @@ public abstract class FileOperations {
      * (Optional) Set the block cache pair to be used to optimize reads within the constructed
      * reader.
      */
-    public ReaderBuilder withBlockCache(BlockCache dataCache, BlockCache indexCache) {
-      this.dataCache = dataCache;
-      this.indexCache = indexCache;
-      return this;
-    }
-
-    /** (Optional) set the data cache to be used to optimize reads within the constructed reader. */
-    public ReaderBuilder withDataCache(BlockCache dataCache) {
-      this.dataCache = dataCache;
-      return this;
-    }
-
-    /**
-     * (Optional) set the index cache to be used to optimize reads within the constructed reader.
-     */
-    public ReaderBuilder withIndexCache(BlockCache indexCache) {
-      this.indexCache = indexCache;
+    public ReaderBuilder withCacheProvider(CacheProvider cacheProvider) {
+      this.cacheProvider = cacheProvider;
       return this;
     }
 
@@ -467,18 +446,7 @@ public abstract class FileOperations {
 
     /** Execute the operation, constructing the specified file reader. */
     public FileSKVIterator build() throws IOException {
-      /**
-       * If the table configuration disallows caching, rewrite the options object to not pass the
-       * caches.
-       */
-      if (!getTableConfiguration().getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) {
-        withIndexCache(null);
-      }
-      if (!getTableConfiguration().getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) {
-        withDataCache(null);
-      }
-      return openReader(
-          toReaderBuilderOptions(dataCache, indexCache, fileLenCache, seekToBeginning));
+      return openReader(toReaderBuilderOptions(cacheProvider, fileLenCache, seekToBeginning));
     }
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java b/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
index a7a62f3..cd1b227 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
@@ -22,6 +22,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 
@@ -36,6 +37,8 @@ public interface FileSKVIterator extends InterruptibleIterator, AutoCloseable {
 
   void closeDeepCopies() throws IOException;
 
+  void setCacheProvider(CacheProvider cacheProvider);
+
   @Override
   void close() throws IOException;
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/BasicCacheProvider.java
similarity index 54%
copy from core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
copy to core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/BasicCacheProvider.java
index a7a62f3..7c3015c 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/BasicCacheProvider.java
@@ -16,26 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.file;
+package org.apache.accumulo.core.file.blockfile.impl;
 
-import java.io.DataInputStream;
-import java.io.IOException;
+import org.apache.accumulo.core.spi.cache.BlockCache;
 
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
-import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+public class BasicCacheProvider implements CacheProvider {
 
-public interface FileSKVIterator extends InterruptibleIterator, AutoCloseable {
-  Key getFirstKey() throws IOException;
+  private final BlockCache indexCache;
+  private final BlockCache dataCache;
 
-  Key getLastKey() throws IOException;
+  public BasicCacheProvider(BlockCache indexCache, BlockCache dataCache) {
+    this.indexCache = indexCache;
+    this.dataCache = dataCache;
+  }
 
-  DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException;
-
-  FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig);
-
-  void closeDeepCopies() throws IOException;
+  @Override
+  public BlockCache getDataCache() {
+    return dataCache;
+  }
 
   @Override
-  void close() throws IOException;
+  public BlockCache getIndexCache() {
+    return indexCache;
+  }
+
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index c9a71b3..18bd854 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -72,8 +72,7 @@ public class CachableBlockFile {
     IoeSupplier<InputStream> inputSupplier = null;
     IoeSupplier<Long> lengthSupplier = null;
     Cache<String,Long> fileLenCache = null;
-    BlockCache dCache = null;
-    BlockCache iCache = null;
+    volatile CacheProvider cacheProvider = CacheProvider.NULL_PROVIDER;
     RateLimiter readLimiter = null;
     Configuration hadoopConf = null;
     CryptoService cryptoService = null;
@@ -110,13 +109,8 @@ public class CachableBlockFile {
       return this;
     }
 
-    public CachableBuilder data(BlockCache dCache) {
-      this.dCache = dCache;
-      return this;
-    }
-
-    public CachableBuilder index(BlockCache iCache) {
-      this.iCache = iCache;
+    public CachableBuilder cacheProvider(CacheProvider cacheProvider) {
+      this.cacheProvider = cacheProvider;
       return this;
     }
 
@@ -138,8 +132,7 @@ public class CachableBlockFile {
     private final RateLimiter readLimiter;
     // private BCFile.Reader _bc;
     private final String cacheId;
-    private final BlockCache _dCache;
-    private final BlockCache _iCache;
+    private CacheProvider cacheProvider;
     private Cache<String,Long> fileLenCache = null;
     private volatile InputStream fin = null;
     private boolean closed = false;
@@ -208,6 +201,7 @@ public class CachableBlockFile {
     }
 
     private BCFile.Reader getBCFile() throws IOException {
+      BlockCache _iCache = cacheProvider.getIndexCache();
       if (_iCache != null) {
         CacheEntry mce = _iCache.getBlock(cacheId + ROOT_BLOCK_NAME, new BCFileLoader());
         if (mce != null) {
@@ -369,8 +363,7 @@ public class CachableBlockFile {
       this.inputSupplier = b.inputSupplier;
       this.lengthSupplier = b.lengthSupplier;
       this.fileLenCache = b.fileLenCache;
-      this._dCache = b.dCache;
-      this._iCache = b.iCache;
+      this.cacheProvider = b.cacheProvider;
       this.readLimiter = b.readLimiter;
       this.conf = b.hadoopConf;
       this.cryptoService = Objects.requireNonNull(b.cryptoService);
@@ -381,6 +374,7 @@ public class CachableBlockFile {
      * read the entire block and then call close on the BlockRead class.
      */
     public CachedBlockRead getMetaBlock(String blockName) throws IOException {
+      BlockCache _iCache = cacheProvider.getIndexCache();
       if (_iCache != null) {
         String _lookup = this.cacheId + "M" + blockName;
         try {
@@ -405,6 +399,7 @@ public class CachableBlockFile {
 
     public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSize)
         throws IOException {
+      BlockCache _iCache = cacheProvider.getIndexCache();
       if (_iCache != null) {
         String _lookup = this.cacheId + "R" + offset;
         CacheEntry ce =
@@ -427,6 +422,7 @@ public class CachableBlockFile {
      */
 
     public CachedBlockRead getDataBlock(int blockIndex) throws IOException {
+      BlockCache _dCache = cacheProvider.getDataCache();
       if (_dCache != null) {
         String _lookup = this.cacheId + "O" + blockIndex;
         CacheEntry ce = _dCache.getBlock(_lookup, new OffsetBlockLoader(blockIndex, false));
@@ -441,6 +437,7 @@ public class CachableBlockFile {
 
     public CachedBlockRead getDataBlock(long offset, long compressedSize, long rawSize)
         throws IOException {
+      BlockCache _dCache = cacheProvider.getDataCache();
       if (_dCache != null) {
         String _lookup = this.cacheId + "R" + offset;
         CacheEntry ce =
@@ -474,6 +471,10 @@ public class CachableBlockFile {
       }
     }
 
+    public void setCacheProvider(CacheProvider cacheProvider) {
+      this.cacheProvider = cacheProvider;
+    }
+
   }
 
   public static class CachedBlockRead extends DataInputStream {
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CacheProvider.java
similarity index 53%
copy from core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
copy to core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CacheProvider.java
index a7a62f3..c7c6d6c 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CacheProvider.java
@@ -16,26 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.file;
+package org.apache.accumulo.core.file.blockfile.impl;
 
-import java.io.DataInputStream;
-import java.io.IOException;
+import org.apache.accumulo.core.spi.cache.BlockCache;
 
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
-import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+public interface CacheProvider {
+  static final CacheProvider NULL_PROVIDER = new BasicCacheProvider(null, null);
 
-public interface FileSKVIterator extends InterruptibleIterator, AutoCloseable {
-  Key getFirstKey() throws IOException;
+  BlockCache getDataCache();
 
-  Key getLastKey() throws IOException;
-
-  DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException;
-
-  FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig);
-
-  void closeDeepCopies() throws IOException;
-
-  @Override
-  void close() throws IOException;
+  BlockCache getIndexCache();
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/OpportunisticBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/OpportunisticBlockCache.java
new file mode 100644
index 0000000..60c8f0a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/OpportunisticBlockCache.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.impl;
+
+import org.apache.accumulo.core.spi.cache.BlockCache;
+import org.apache.accumulo.core.spi.cache.CacheEntry;
+
+public class OpportunisticBlockCache implements BlockCache {
+
+  private BlockCache cache;
+
+  public OpportunisticBlockCache(BlockCache cache) {
+    this.cache = cache;
+  }
+
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte[] buf) {
+    return null;
+  }
+
+  @Override
+  public CacheEntry getBlock(String blockName) {
+    return cache.getBlock(blockName);
+  }
+
+  @Override
+  public CacheEntry getBlock(String blockName, Loader loader) {
+    return cache.getBlock(blockName);
+  }
+
+  @Override
+  public long getMaxHeapSize() {
+    return cache.getMaxHeapSize();
+  }
+
+  @Override
+  public long getMaxSize() {
+    return cache.getMaxSize();
+  }
+
+  @Override
+  public Stats getStats() {
+    return cache.getStats();
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java
new file mode 100644
index 0000000..f94a42d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.impl;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.cache.BlockCache;
+import org.apache.accumulo.core.spi.scan.ScanDirectives;
+
+public class ScanCacheProvider implements CacheProvider {
+
+  private final BlockCache indexCache;
+  private final BlockCache dataCache;
+
+  public ScanCacheProvider(AccumuloConfiguration tableConfig, ScanDirectives directives,
+      BlockCache indexCache, BlockCache dataCache) {
+    switch (directives.getIndexCacheUsage()) {
+      case ENABLED:
+        this.indexCache = indexCache;
+        break;
+      case DISABLED:
+        this.indexCache = null;
+        break;
+      case OPPORTUNISTIC:
+        this.indexCache = new OpportunisticBlockCache(indexCache);
+        break;
+      case TABLE:
+        this.indexCache =
+            tableConfig.getBoolean(Property.TABLE_INDEXCACHE_ENABLED) ? indexCache : null;
+        break;
+      default:
+        throw new IllegalStateException();
+    }
+
+    switch (directives.getDataCacheUsage()) {
+      case ENABLED:
+        this.dataCache = dataCache;
+        break;
+      case DISABLED:
+        this.dataCache = null;
+        break;
+      case OPPORTUNISTIC:
+        this.dataCache = new OpportunisticBlockCache(dataCache);
+        break;
+      case TABLE:
+        this.dataCache =
+            tableConfig.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED) ? dataCache : null;
+        break;
+      default:
+        throw new IllegalStateException();
+    }
+
+  }
+
+  @Override
+  public BlockCache getDataCache() {
+    return dataCache;
+  }
+
+  @Override
+  public BlockCache getIndexCache() {
+    return indexCache;
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
index eb145d9..38e7330 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iteratorsImpl.system.MapFileIterator;
@@ -137,6 +138,9 @@ public class MapFileOperations extends FileOperations {
     public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
       return ((FileSKVIterator) reader).getSample(sampleConfig);
     }
+
+    @Override
+    public void setCacheProvider(CacheProvider cacheProvider) {}
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
index 13fe263..dabb3cc 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -103,4 +104,8 @@ class MultiIndexIterator extends HeapIterator implements FileSKVIterator {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public void setCacheProvider(CacheProvider cacheProvider) {
+    source.setCacheProvider(cacheProvider);
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 50b7a87..38d4c87 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -54,6 +54,7 @@ import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.NoSuchMetaStoreException;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
@@ -1128,6 +1129,11 @@ public class RFile {
     public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void setCacheProvider(CacheProvider cacheProvider) {
+      throw new UnsupportedOperationException();
+    }
   }
 
   public static class Reader extends HeapIterator implements FileSKVIterator {
@@ -1514,5 +1520,10 @@ public class RFile {
         lgr.setInterruptFlag(interruptFlag);
       }
     }
+
+    @Override
+    public void setCacheProvider(CacheProvider cacheProvider) {
+      reader.setCacheProvider(cacheProvider);
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 14be8d0..645ae51 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -50,8 +50,8 @@ public class RFileOperations extends FileOperations {
     CachableBuilder cb =
         new CachableBuilder().fsPath(options.getFileSystem(), new Path(options.getFilename()))
             .conf(options.getConfiguration()).fileLen(options.getFileLenCache())
-            .data(options.getDataCache()).index(options.getIndexCache())
-            .readLimiter(options.getRateLimiter()).cryptoService(options.getCryptoService());
+            .cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter())
+            .cryptoService(options.getCryptoService());
     return new RFile.Reader(cb);
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MapFileIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MapFileIterator.java
index 5999ade..3f32edb 100644
--- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MapFileIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MapFileIterator.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.NoSuchMetaStoreException;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.file.map.MapFileUtil;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
@@ -164,4 +165,7 @@ public class MapFileIterator implements FileSKVIterator {
   public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
     return null;
   }
+
+  @Override
+  public void setCacheProvider(CacheProvider cacheProvider) {}
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SequenceFileIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SequenceFileIterator.java
index a8c3f92..a8b3118 100644
--- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SequenceFileIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SequenceFileIterator.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
@@ -136,4 +137,7 @@ public class SequenceFileIterator implements FileSKVIterator {
   public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public void setCacheProvider(CacheProvider cacheProvider) {}
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/DefaultScanDirectives.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/DefaultScanDirectives.java
new file mode 100644
index 0000000..a9b40f0
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/DefaultScanDirectives.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+/**
+ * This class is intentionally package private. Do not make public!
+ *
+ * <p>
+ * The purpose of this class is to avoid any object creation in the case where
+ * {@code ScanDirectives.builder().build()} is called.
+ */
+class DefaultScanDirectives extends ScanDirectivesImpl {
+
+  static DefaultScanDirectives DEFAULT_SCAN_DIRECTIVES = new DefaultScanDirectives();
+
+  private DefaultScanDirectives() {
+    super();
+    super.build();
+  }
+
+  @Override
+  public Builder setExecutorName(String name) {
+    return new ScanDirectivesImpl().setExecutorName(name);
+  }
+
+  @Override
+  public Builder setIndexCacheUsage(CacheUsage usage) {
+    return new ScanDirectivesImpl().setIndexCacheUsage(usage);
+  }
+
+  @Override
+  public Builder setDataCacheUsage(CacheUsage usage) {
+    return new ScanDirectivesImpl().setDataCacheUsage(usage);
+  }
+
+  @Override
+  public ScanDirectives build() {
+    return this;
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDirectives.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDirectives.java
new file mode 100644
index 0000000..92da619
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDirectives.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import org.apache.accumulo.core.spi.scan.ScanDispatcher.DispatchParameters;
+
+/**
+ * Encapsulates information about how a scan should be executed. This is the return type for
+ * {@link ScanDispatcher#dispatch(DispatchParameters)}. To create an instance of this use
+ * {@link ScanDirectives#builder()}
+ *
+ * @since 2.1.0
+ */
+public interface ScanDirectives {
+
+  /**
+   * Communicates how a scan should use cache.
+   *
+   * @since 2.1.0
+   */
+  enum CacheUsage {
+    /**
+     * Use cache for this can, possibly overriding table settings.
+     */
+    ENABLED,
+    /**
+     * Do not use cache for this scan, possibly overriding table settings.
+     */
+    DISABLED,
+    /**
+     * Use data if it exists in cache, but never load data into cache.
+     */
+    OPPORTUNISTIC,
+    /**
+     * Use the tables cache settings for this scan.
+     */
+    TABLE
+  }
+
+  public String getExecutorName();
+
+  public CacheUsage getDataCacheUsage();
+
+  public CacheUsage getIndexCacheUsage();
+
+  /**
+   * @since 2.1.0
+   */
+  public static interface Builder {
+
+    /**
+     * If this is not called, then {@value SimpleScanDispatcher#DEFAULT_SCAN_EXECUTOR_NAME} should
+     * be used.
+     *
+     * @param name
+     *          a non null name of an existing scan executor to use for this scan from the key set
+     *          of {@link ScanDispatcher.DispatchParameters#getScanExecutors()}
+     * @return may return self or a new object
+     */
+    public Builder setExecutorName(String name);
+
+    /**
+     * If this is not called, then {@link CacheUsage#TABLE} should be used.
+     *
+     * @param usage
+     *          a non null usage indicating how the scan should use cache for file metadata (like
+     *          the index tree within a file)
+     * @return may return self or a new object
+     */
+    public Builder setIndexCacheUsage(CacheUsage usage);
+
+    /**
+     * If this is not called, then {@link CacheUsage#TABLE} should be used.
+     *
+     * @param usage
+     *          a non null usage indicating how the scan should use cache for file data
+     * @return may return self or a new object
+     */
+    public Builder setDataCacheUsage(CacheUsage usage);
+
+    /**
+     * @return an immutable {@link ScanDirectives} object.
+     */
+    public ScanDirectives build();
+  }
+
+  /**
+   * @return a {@link ScanDirectives} builder
+   */
+  public static Builder builder() {
+    return DefaultScanDirectives.DEFAULT_SCAN_DIRECTIVES;
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDirectivesImpl.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDirectivesImpl.java
new file mode 100644
index 0000000..07ca40d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDirectivesImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Objects;
+
+import org.apache.accumulo.core.spi.scan.ScanDirectives.Builder;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is intentionally package private. Do not make public!
+ */
+class ScanDirectivesImpl implements ScanDirectives, Builder {
+
+  // The purpose of this is to allow building an immutable ScanDirectives object without creating
+  // separate Builder and ScanDirectives objects. This is done to reduce object creation and
+  // copying. This could easily be changed to two objects without changing the interfaces.
+  private boolean built = false;
+
+  private String executorName;
+  private CacheUsage indexCacheUsage;
+  private CacheUsage dataCacheUsage;
+
+  ScanDirectivesImpl() {
+    executorName = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+    indexCacheUsage = CacheUsage.TABLE;
+    dataCacheUsage = CacheUsage.TABLE;
+  }
+
+  @Override
+  public String getExecutorName() {
+    Preconditions.checkState(built);
+    return executorName;
+  }
+
+  @Override
+  public Builder setExecutorName(String name) {
+    Preconditions.checkState(!built);
+    this.executorName = Objects.requireNonNull(name);
+    return this;
+  }
+
+  @Override
+  public ScanDirectives build() {
+    Preconditions.checkState(!built);
+    built = true;
+    return this;
+  }
+
+  @Override
+  public Builder setIndexCacheUsage(CacheUsage usage) {
+    Preconditions.checkState(!built);
+    this.indexCacheUsage = Objects.requireNonNull(usage);
+    return this;
+  }
+
+  @Override
+  public Builder setDataCacheUsage(CacheUsage usage) {
+    Preconditions.checkState(!built);
+    this.dataCacheUsage = Objects.requireNonNull(usage);
+    return this;
+  }
+
+  @Override
+  public CacheUsage getDataCacheUsage() {
+    Preconditions.checkState(built);
+    return dataCacheUsage;
+  }
+
+  @Override
+  public CacheUsage getIndexCacheUsage() {
+    Preconditions.checkState(built);
+    return indexCacheUsage;
+  }
+
+  @Override
+  public String toString() {
+    return "{executorName=" + executorName + ", indexCacheUsage=" + indexCacheUsage
+        + ", dataCacheUsage=" + dataCacheUsage + ", built=" + built + "}";
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
index 256f031..a8d83ef 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
@@ -68,7 +68,10 @@ public interface ScanDispatcher {
    * future.
    *
    * @since 2.0.0
+   * @deprecated since 2.1.0 replaced by {@link DispatchParameters} and
+   *             {@link ScanDispatcher#dispatch(DispatchParameters)}
    */
+  @Deprecated
   public static interface DispatchParmaters {
     /**
      * @return information about the scan to be dispatched.
@@ -85,6 +88,50 @@ public interface ScanDispatcher {
 
   /**
    * @return Should return one of the executors named params.getScanExecutors().keySet()
+   *
+   * @deprecated since 2.1.0 please implement {@link #dispatch(DispatchParameters)} instead of this.
+   *             Accumulo will only call {@link #dispatch(DispatchParameters)} directly, it will
+   *             never call this. However the default implementation of
+   *             {@link #dispatch(DispatchParameters)} calls this method.
+   */
+  @Deprecated
+  default String dispatch(DispatchParmaters params) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * The method parameters for {@link ScanDispatcher#dispatch(DispatchParameters)}. This interface
+   * exists so the API can evolve and additional parameters can be passed to the method in the
+   * future.
+   *
+   * @since 2.1.0
    */
-  String dispatch(DispatchParmaters params);
+  public static interface DispatchParameters {
+    /**
+     * @return information about the scan to be dispatched.
+     */
+    ScanInfo getScanInfo();
+
+    /**
+     * @return the currently configured scan executors
+     */
+    Map<String,ScanExecutor> getScanExecutors();
+
+    ServiceEnvironment getServiceEnv();
+  }
+
+  /**
+   * Accumulo calls this method for each scan batch to determine what executor to use and how to
+   * utilize cache for the scan.
+   *
+   * @since 2.1.0
+   */
+
+  default ScanDirectives dispatch(DispatchParameters params) {
+    String executor = dispatch((DispatchParmaters) params);
+    if (executor.equals(DefaultScanDirectives.DEFAULT_SCAN_DIRECTIVES.getExecutorName()))
+      return DefaultScanDirectives.DEFAULT_SCAN_DIRECTIVES;
+
+    return ScanDirectives.builder().setExecutorName(executor).build();
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
index 12e9194..c0c80cb 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -18,16 +18,24 @@
  */
 package org.apache.accumulo.core.spi.scan;
 
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.spi.scan.ScanDirectives.CacheUsage;
 
 import com.google.common.collect.ImmutableMap;
 
 /**
- * If no options are given, then this will dispatch to an executor named {@code default}. This
- * dispatcher supports the following options.
+ * If no options are given, then this will default to an executor named {@code default} and
+ * {@link CacheUsage#TABLE} for index and data cache. This dispatcher supports the following
+ * options.
  *
  * <UL>
  * <LI>{@code table.scan.dispatcher.opts.executor=<scan executor name>} : dispatches all scans to
@@ -39,7 +47,8 @@ import com.google.common.collect.ImmutableMap;
  * <LI>{@code table.scan.dispatcher.opts.executor.<type>=<scan executor name>} : dispatches scans
  * that set the hint {@code scan_type=<type>} to the named executor. If this setting matches then it
  * takes precedence over all other settings. See {@link ScannerBase#setExecutionHints(Map)}</LI>
- *
+ * <LI>{@code table.scan.dispatcher.opts.cacheUsage.<type>[.index|.data]=enabled|disabled|opportunistic|table}
+ * : for scans that set the hint {@code scan_type=<type>} determines how the scan will use cache.
  * </UL>
  *
  * The {@code multi_executor} and {@code single_executor} options override the {@code executor}
@@ -51,10 +60,12 @@ public class SimpleScanDispatcher implements ScanDispatcher {
   private final String EXECUTOR_PREFIX = "executor.";
 
   private final Set<String> VALID_OPTS = Set.of("executor", "multi_executor", "single_executor");
-  private String multiExecutor;
-  private String singleExecutor;
 
-  private Map<String,String> typeExecutors;
+  private ScanDirectives singleDirectives;
+  private ScanDirectives multiDirectives;
+  private Map<String,Map<ScanInfo.Type,ScanDirectives>> hintDirectives;
+
+  private static Pattern CACHE_PATTERN = Pattern.compile("cacheUsage[.](\\w+)([.](index|data))?");
 
   public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
 
@@ -62,46 +73,96 @@ public class SimpleScanDispatcher implements ScanDispatcher {
   public void init(InitParameters params) {
     Map<String,String> options = params.getOptions();
 
-    var teb = ImmutableMap.<String,String>builder();
+    Map<String,CacheUsage> indexCacheUsage = new HashMap<>();
+    Map<String,CacheUsage> dataCacheUsage = new HashMap<>();
+    Map<String,String> scanExecutors = new HashMap<>();
+    Set<String> hintScanTypes = new HashSet<>();
 
     options.forEach((k, v) -> {
+
+      Matcher cacheMatcher = CACHE_PATTERN.matcher(k);
+
       if (k.startsWith(EXECUTOR_PREFIX)) {
-        String type = k.substring(EXECUTOR_PREFIX.length());
-        teb.put(type, v);
+        String hintScanType = k.substring(EXECUTOR_PREFIX.length());
+        scanExecutors.put(hintScanType, v);
+        hintScanTypes.add(hintScanType);
+      } else if (cacheMatcher.matches()) {
+        String hintScanType = cacheMatcher.group(1);
+        CacheUsage usage = CacheUsage.valueOf(v.toUpperCase());
+        String cacheType = cacheMatcher.group(3);
+
+        hintScanTypes.add(hintScanType);
+
+        if ("index".equals(cacheType)) {
+          indexCacheUsage.put(hintScanType, usage);
+        } else if ("data".equals(cacheType)) {
+          dataCacheUsage.put(hintScanType, usage);
+        } else {
+          indexCacheUsage.put(hintScanType, usage);
+          dataCacheUsage.put(hintScanType, usage);
+        }
       } else if (!VALID_OPTS.contains(k)) {
         throw new IllegalArgumentException("Invalid option " + k);
       }
     });
 
-    typeExecutors = teb.build();
+    // This method pre-computes all possible scan directives objects that could ever be needed.
+    // This is done to make the dispatch method more efficient. If the number of config permutations
+    // grows, this approach may have to be abandoned. For now its tractable.
+
+    ScanDirectives baseDirectives = Optional.ofNullable(options.get("executor"))
+        .map(name -> ScanDirectives.builder().setExecutorName(name).build())
+        .orElse(DefaultScanDirectives.DEFAULT_SCAN_DIRECTIVES);
+    singleDirectives = Optional.ofNullable(options.get("single_executor"))
+        .map(name -> ScanDirectives.builder().setExecutorName(name).build()).orElse(baseDirectives);
+    multiDirectives = Optional.ofNullable(options.get("multi_executor"))
+        .map(name -> ScanDirectives.builder().setExecutorName(name).build()).orElse(baseDirectives);
 
-    String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME);
-    multiExecutor = options.getOrDefault("multi_executor", base);
-    singleExecutor = options.getOrDefault("single_executor", base);
+    var stpb = ImmutableMap.<String,Map<ScanInfo.Type,ScanDirectives>>builder();
 
+    for (String hintScanType : hintScanTypes) {
+      EnumMap<ScanInfo.Type,ScanDirectives> precomupted = new EnumMap<>(ScanInfo.Type.class);
+
+      precomupted.put(ScanInfo.Type.SINGLE, ScanDirectives.builder()
+          .setExecutorName(
+              scanExecutors.getOrDefault(hintScanType, singleDirectives.getExecutorName()))
+          .setIndexCacheUsage(indexCacheUsage.getOrDefault(hintScanType, CacheUsage.TABLE))
+          .setDataCacheUsage(dataCacheUsage.getOrDefault(hintScanType, CacheUsage.TABLE)).build());
+
+      precomupted.put(ScanInfo.Type.MULTI, ScanDirectives.builder()
+          .setExecutorName(
+              scanExecutors.getOrDefault(hintScanType, multiDirectives.getExecutorName()))
+          .setIndexCacheUsage(indexCacheUsage.getOrDefault(hintScanType, CacheUsage.TABLE))
+          .setDataCacheUsage(dataCacheUsage.getOrDefault(hintScanType, CacheUsage.TABLE)).build());
+
+      stpb.put(hintScanType, precomupted);
+    }
+
+    hintDirectives = stpb.build();
   }
 
   @Override
-  public String dispatch(DispatchParmaters params) {
+  public ScanDirectives dispatch(DispatchParameters params) {
     ScanInfo scanInfo = params.getScanInfo();
 
-    if (!typeExecutors.isEmpty()) {
-      String scanType = scanInfo.getExecutionHints().get("scan_type");
-      if (scanType != null) {
-        String executor = typeExecutors.get(scanType);
-        if (executor != null) {
-          return executor;
+    if (!hintDirectives.isEmpty()) {
+      String hintScanType = scanInfo.getExecutionHints().get("scan_type");
+      if (hintScanType != null) {
+        var precomputedDirectives = hintDirectives.get(hintScanType);
+        if (precomputedDirectives != null) {
+          return precomputedDirectives.get(scanInfo.getScanType());
         }
       }
     }
 
     switch (scanInfo.getScanType()) {
       case MULTI:
-        return multiExecutor;
+        return multiDirectives;
       case SINGLE:
-        return singleExecutor;
+        return singleDirectives;
       default:
         throw new IllegalArgumentException("Unexpected scan type " + scanInfo.getScanType());
     }
+
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index aa85e31..ce18259 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.function.Predicate;
 
 import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
@@ -191,7 +192,7 @@ public class SummaryReader {
       // only summary data is wanted.
       CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache);
       CachableBuilder cb = new CachableBuilder().fsPath(fs, file).conf(conf).fileLen(fileLenCache)
-          .index(compositeCache).cryptoService(cryptoService);
+          .cacheProvider(new BasicCacheProvider(compositeCache, null)).cryptoService(cryptoService);
       bcReader = new CachableBlockFile.Reader(cb);
       return load(bcReader, summarySelector, factory);
     } catch (FileNotFoundException fne) {
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index bf7bd11..b035291 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -70,6 +70,7 @@ import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguratio
 import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
 import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager;
+import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
@@ -310,7 +311,7 @@ public class RFileTest {
       LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA);
 
       CachableBuilder cb = new CachableBuilder().cacheId("source-1").input(in).length(fileLength)
-          .conf(conf).data(dataCache).index(indexCache).cryptoService(
+          .conf(conf).cacheProvider(new BasicCacheProvider(indexCache, dataCache)).cryptoService(
               CryptoServiceFactory.newInstance(accumuloConfiguration, ClassloaderType.JAVA));
       reader = new RFile.Reader(cb);
       if (cfsi)
@@ -1748,7 +1749,8 @@ public class RFileTest {
     manager.start(new BlockCacheConfiguration(aconf));
     CachableBuilder cb = new CachableBuilder().input(in2).length(data.length).conf(hadoopConf)
         .cryptoService(CryptoServiceFactory.newInstance(aconf, ClassloaderType.JAVA))
-        .index(manager.getBlockCache(CacheType.INDEX)).data(manager.getBlockCache(CacheType.DATA));
+        .cacheProvider(new BasicCacheProvider(manager.getBlockCache(CacheType.INDEX),
+            manager.getBlockCache(CacheType.DATA)));
     Reader reader = new RFile.Reader(cb);
     checkIndex(reader);
 
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
index 5cdb9f8..b1082b4 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.accumulo.core.spi.scan;
 
+import static org.apache.accumulo.core.spi.scan.ScanDirectives.CacheUsage.DISABLED;
+import static org.apache.accumulo.core.spi.scan.ScanDirectives.CacheUsage.ENABLED;
+import static org.apache.accumulo.core.spi.scan.ScanDirectives.CacheUsage.OPPORTUNISTIC;
+import static org.apache.accumulo.core.spi.scan.ScanDirectives.CacheUsage.TABLE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -28,7 +32,8 @@ import java.util.Map;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment;
-import org.apache.accumulo.core.spi.scan.ScanDispatcher.DispatchParmaters;
+import org.apache.accumulo.core.spi.scan.ScanDirectives.CacheUsage;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher.DispatchParameters;
 import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
 import org.junit.Test;
 
@@ -41,7 +46,7 @@ public class SimpleScanDispatcherTest {
         .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".prioritizer"));
   }
 
-  private static class DispatchParametersImps implements DispatchParmaters {
+  private static class DispatchParametersImps implements DispatchParameters {
 
     private ScanInfo si;
     private Map<String,ScanExecutor> se;
@@ -69,7 +74,7 @@ public class SimpleScanDispatcherTest {
   }
 
   private void runTest(Map<String,String> opts, Map<String,String> hints, String expectedSingle,
-      String expectedMulti) {
+      String expectedMulti, CacheUsage expectedIndexCU, CacheUsage expectedDataCU) {
     TestScanInfo msi = new TestScanInfo("a", Type.MULTI, 4);
     msi.executionHints = hints;
     TestScanInfo ssi = new TestScanInfo("a", Type.SINGLE, 4);
@@ -100,12 +105,19 @@ public class SimpleScanDispatcherTest {
     executors.put("E2", null);
     executors.put("E3", null);
 
-    assertEquals(expectedMulti, ssd1.dispatch(new DispatchParametersImps(msi, executors)));
-    assertEquals(expectedSingle, ssd1.dispatch(new DispatchParametersImps(ssi, executors)));
+    ScanDirectives multiPrefs = ssd1.dispatch(new DispatchParametersImps(msi, executors));
+    assertEquals(expectedMulti, multiPrefs.getExecutorName());
+    assertEquals(expectedIndexCU, multiPrefs.getIndexCacheUsage());
+    assertEquals(expectedDataCU, multiPrefs.getDataCacheUsage());
+
+    ScanDirectives singlePrefs = ssd1.dispatch(new DispatchParametersImps(ssi, executors));
+    assertEquals(expectedSingle, singlePrefs.getExecutorName());
+    assertEquals(expectedIndexCU, singlePrefs.getIndexCacheUsage());
+    assertEquals(expectedDataCU, singlePrefs.getDataCacheUsage());
   }
 
   private void runTest(Map<String,String> opts, String expectedSingle, String expectedMulti) {
-    runTest(opts, Collections.emptyMap(), expectedSingle, expectedMulti);
+    runTest(opts, Collections.emptyMap(), expectedSingle, expectedMulti, TABLE, TABLE);
   }
 
   @Test
@@ -124,10 +136,31 @@ public class SimpleScanDispatcherTest {
 
   @Test
   public void testHints() {
-    runTest(Map.of("executor", "E1"), Map.of("scan_type", "quick"), "E1", "E1");
+    runTest(Map.of("executor", "E1"), Map.of("scan_type", "quick"), "E1", "E1", TABLE, TABLE);
     runTest(Map.of("executor", "E1", "executor.quick", "E2"), Map.of("scan_type", "quick"), "E2",
-        "E2");
+        "E2", TABLE, TABLE);
     runTest(Map.of("executor", "E1", "executor.quick", "E2", "executor.slow", "E3"),
-        Map.of("scan_type", "slow"), "E3", "E3");
+        Map.of("scan_type", "slow"), "E3", "E3", TABLE, TABLE);
+  }
+
+  @Test
+  public void testCache() {
+    String dname = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+    runTest(
+        Map.of("executor", "E1", "cacheUsage.slow.index", "opportunistic", "cacheUsage.slow.data",
+            "disabled", "cacheUsage.fast", "enabled", "executor.slow", "E2"),
+        Map.of("scan_type", "slow"), "E2", "E2", OPPORTUNISTIC, DISABLED);
+    runTest(
+        Map.of("single_executor", "E1", "cacheUsage.slow.index", "opportunistic",
+            "cacheUsage.slow.data", "disabled", "cacheUsage.fast", "enabled"),
+        Map.of("scan_type", "fast"), "E1", dname, ENABLED, ENABLED);
+    runTest(
+        Map.of("executor", "E1", "cacheUsage.slow.index", "opportunistic", "cacheUsage.slow.data",
+            "disabled", "cacheUsage.fast", "enabled"),
+        Map.of("scan_type", "notconfigured"), "E1", "E1", TABLE, TABLE);
+    runTest(Map.of("executor", "E1", "cacheUsage.slow.index", "opportunistic",
+        "cacheUsage.slow.data", "disabled", "cacheUsage.fast", "enabled"), Map.of(), "E1", "E1",
+        TABLE, TABLE);
   }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index e2c682b..3fdaa6a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
@@ -46,7 +47,6 @@ import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator.Dat
 import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
-import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -111,12 +111,6 @@ public class FileManager {
 
   private VolumeManager fs;
 
-  // the data cache and index cache are allocated in
-  // TabletResourceManager and passed through the file opener to
-  // CachableBlockFile which can handle the caches being
-  // null if unallocated
-  private BlockCache dataCache = null;
-  private BlockCache indexCache = null;
   private Cache<String,Long> fileLenCache;
 
   private long maxIdleTime;
@@ -163,21 +157,12 @@ public class FileManager {
 
   }
 
-  /**
-   *
-   * @param dataCache
-   *          : underlying file can and should be able to handle a null cache
-   * @param indexCache
-   *          : underlying file can and should be able to handle a null cache
-   */
   public FileManager(ServerContext context, VolumeManager fs, int maxOpen,
-      Cache<String,Long> fileLenCache, BlockCache dataCache, BlockCache indexCache) {
+      Cache<String,Long> fileLenCache) {
 
     if (maxOpen <= 0)
       throw new IllegalArgumentException("maxOpen <= 0");
     this.context = context;
-    this.dataCache = dataCache;
-    this.indexCache = indexCache;
     this.fileLenCache = fileLenCache;
 
     this.filePermits = new Semaphore(maxOpen, false);
@@ -277,7 +262,7 @@ public class FileManager {
   }
 
   private Map<FileSKVIterator,String> reserveReaders(KeyExtent tablet, Collection<String> files,
-      boolean continueOnFailure) throws IOException {
+      boolean continueOnFailure, CacheProvider cacheProvider) throws IOException {
 
     if (!tablet.isMeta() && files.size() >= maxOpen) {
       throw new IllegalArgumentException("requested files exceeds max open");
@@ -322,6 +307,8 @@ public class FileManager {
       }
     }
 
+    readersReserved.forEach((k, v) -> k.setCacheProvider(cacheProvider));
+
     // close files before opening files to ensure we stay under resource
     // limitations
     closeReaders(filesToClose);
@@ -338,7 +325,7 @@ public class FileManager {
             .forFile(path.toString(), ns, ns.getConf(), context.getCryptoService())
             .withTableConfiguration(
                 context.getServerConfFactory().getTableConfiguration(tablet.getTableId()))
-            .withBlockCache(dataCache, indexCache).withFileLenCache(fileLenCache).build();
+            .withCacheProvider(cacheProvider).withFileLenCache(fileLenCache).build();
         readersReserved.put(reader, file);
       } catch (Exception e) {
 
@@ -491,11 +478,13 @@ public class FileManager {
     private ArrayList<FileSKVIterator> tabletReservedReaders;
     private KeyExtent tablet;
     private boolean continueOnFailure;
+    private CacheProvider cacheProvider;
 
-    ScanFileManager(KeyExtent tablet) {
+    ScanFileManager(KeyExtent tablet, CacheProvider cacheProvider) {
       tabletReservedReaders = new ArrayList<>();
       dataSources = new ArrayList<>();
       this.tablet = tablet;
+      this.cacheProvider = cacheProvider;
 
       continueOnFailure = context.getServerConfFactory().getTableConfiguration(tablet.getTableId())
           .getBoolean(Property.TABLE_FAILURES_IGNORE);
@@ -526,7 +515,7 @@ public class FileManager {
       }
 
       Map<FileSKVIterator,String> newlyReservedReaders =
-          reserveReaders(tablet, files, continueOnFailure);
+          reserveReaders(tablet, files, continueOnFailure, cacheProvider);
 
       tabletReservedReaders.addAll(newlyReservedReaders.keySet());
       return newlyReservedReaders;
@@ -639,7 +628,7 @@ public class FileManager {
     }
   }
 
-  public ScanFileManager newScanFileManager(KeyExtent tablet) {
-    return new ScanFileManager(tablet);
+  public ScanFileManager newScanFileManager(KeyExtent tablet, CacheProvider cacheProvider) {
+    return new ScanFileManager(tablet, cacheProvider);
   }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 2ae1f26..72c5163 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -53,12 +53,15 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
 import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
+import org.apache.accumulo.core.file.blockfile.impl.ScanCacheProvider;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.cache.BlockCacheManager;
 import org.apache.accumulo.core.spi.cache.CacheType;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.scan.ScanDirectives;
 import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher.DispatchParameters;
 import org.apache.accumulo.core.spi.scan.ScanDispatcher.DispatchParmaters;
 import org.apache.accumulo.core.spi.scan.ScanExecutor;
 import org.apache.accumulo.core.spi.scan.ScanInfo;
@@ -425,8 +428,7 @@ public class TabletServerResourceManager {
     fileLenCache =
         CacheBuilder.newBuilder().maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)).build();
 
-    fileManager = new FileManager(context, context.getVolumeManager(), maxOpenFiles, fileLenCache,
-        _dCache, _iCache);
+    fileManager = new FileManager(context, context.getVolumeManager(), maxOpenFiles, fileLenCache);
 
     memoryManager = Property.createInstanceFromPropertyName(acuConf, Property.TSERV_MEM_MGMT,
         MemoryManager.class, new LargestFirstMemoryManager());
@@ -768,11 +770,13 @@ public class TabletServerResourceManager {
       lastReportedCommitTime = System.currentTimeMillis();
     }
 
-    public synchronized ScanFileManager newScanFileManager() {
+    public synchronized ScanFileManager newScanFileManager(ScanDirectives scanDirectives) {
       if (closed) {
         throw new IllegalStateException("closed");
       }
-      return fileManager.newScanFileManager(extent);
+
+      return fileManager.newScanFileManager(extent,
+          new ScanCacheProvider(tableConf, scanDirectives, _iCache, _dCache));
     }
 
     // END methods that Tablets call to manage their set of open map files
@@ -924,17 +928,27 @@ public class TabletServerResourceManager {
     }
   }
 
+  @SuppressWarnings("deprecation")
+  private static abstract class DispatchParamsImpl
+      implements DispatchParameters, DispatchParmaters {
+
+  }
+
   public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher, ScanSession scanInfo,
       Runnable task) {
 
     task = ScanSession.wrap(scanInfo, task);
 
     if (tablet.isRootTablet()) {
+      // TODO make meta dispatch??
+      scanInfo.scanParams.setScanDirectives(ScanDirectives.builder().build());
       task.run();
     } else if (tablet.isMeta()) {
+      // TODO make meta dispatch??
+      scanInfo.scanParams.setScanDirectives(ScanDirectives.builder().build());
       scanExecutors.get("meta").execute(task);
     } else {
-      String scanExecutorName = dispatcher.dispatch(new DispatchParmaters() {
+      DispatchParameters params = new DispatchParamsImpl() {
         @Override
         public ScanInfo getScanInfo() {
           return scanInfo;
@@ -949,14 +963,18 @@ public class TabletServerResourceManager {
         public ServiceEnvironment getServiceEnv() {
           return new ServiceEnvironmentImpl(context);
         }
-      });
-      ExecutorService executor = scanExecutors.get(scanExecutorName);
+      };
+
+      ScanDirectives prefs = dispatcher.dispatch(params);
+      scanInfo.scanParams.setScanDirectives(prefs);
+
+      ExecutorService executor = scanExecutors.get(prefs.getExecutorName());
       if (executor == null) {
         log.warn(
             "For table id {}, {} dispatched to non-existant executor {} Using default executor.",
-            tablet.getTableId(), dispatcher.getClass().getName(), scanExecutorName);
+            tablet.getTableId(), dispatcher.getClass().getName(), prefs.getExecutorName());
         executor = scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
-      } else if ("meta".equals(scanExecutorName)) {
+      } else if ("meta".equals(prefs.getExecutorName())) {
         log.warn("For table id {}, {} dispatched to meta executor. Using default executor.",
             tablet.getTableId(), dispatcher.getClass().getName());
         executor = scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java
index 79b0bab..513058b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.scan.ScanDirectives;
 
 /**
  * Information needed to execute a scan inside a tablet
@@ -42,6 +43,7 @@ public final class ScanParameters {
   private final SamplerConfiguration samplerConfig;
   private final long batchTimeOut;
   private final String classLoaderContext;
+  private volatile ScanDirectives directives;
 
   public ScanParameters(int maxEntries, Authorizations authorizations, Set<Column> columnSet,
       List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, boolean isolated,
@@ -95,6 +97,14 @@ public final class ScanParameters {
     return classLoaderContext;
   }
 
+  public void setScanDirectives(ScanDirectives directives) {
+    this.directives = directives;
+  }
+
+  public ScanDirectives getScanDirectives() {
+    return directives;
+  }
+
   @Override
   public String toString() {
     StringBuilder buf = new StringBuilder();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index a9750dd..94eac68 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -140,7 +140,8 @@ class ScanDataSource implements DataSource {
 
       // only acquire the file manager when we know the tablet is open
       if (fileManager == null) {
-        fileManager = tablet.getTabletResources().newScanFileManager();
+        fileManager =
+            tablet.getTabletResources().newScanFileManager(scanParams.getScanDirectives());
         tablet.addActiveScans(this);
       }
 
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
index 4bac3f7..014a0c4 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
@@ -160,6 +161,9 @@ public class DefaultCompactionStrategyTest {
       return null;
     }
 
+    @Override
+    public void setCacheProvider(CacheProvider cacheProvider) {}
+
   }
 
   static final DefaultConfiguration dfault = DefaultConfiguration.getInstance();