You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/07/10 22:02:32 UTC

[accumulo] branch master updated (f2d0371 -> 5e5c74f)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git.


    from f2d0371  Added additional client properties for Scanner and BatchScanner (#548)
     add d19638a  ACCUMULO-3510 Updated scheduler to use comparators for sessions so that we can have a priority queue of scans
     add e43b0df  Added table specific thread pools (part of ACCUMULO-4074 which is TBD)
     new aaa757e  ACCUMULO-4074 Added support for multiple scan executors (#549)
     new 5e5c74f  Merge branch 'ACCUMULO-4074-2'

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/pom.xml                                       |  19 ++
 .../accumulo/core/conf/AccumuloConfiguration.java  | 136 ++++++++++++++
 .../accumulo/core/conf/DefaultConfiguration.java   |   5 +
 .../org/apache/accumulo/core/conf/Property.java    |  50 ++++-
 .../accumulo/core/conf/SiteConfiguration.java      |  10 +
 .../core/spi/common/IteratorConfiguration.java     |  21 ++-
 .../org/apache/accumulo/core/spi/common/Stats.java |  31 ++--
 .../core/spi/scan/IdleRatioScanPrioritizer.java    |  51 ++++++
 .../accumulo/core/spi/scan/ScanDispatcher.java     |  52 ++++++
 .../accumulo/core/spi/scan/ScanExecutor.java       |  53 +++---
 .../apache/accumulo/core/spi/scan/ScanInfo.java    | 116 ++++++++++++
 .../accumulo/core/spi/scan/ScanPrioritizer.java    |  20 +-
 .../core/spi/scan/SimpleScanDispatcher.java        |  74 ++++++++
 ....java => AccumuloUncaughtExceptionHandler.java} |  21 +--
 .../accumulo/core/util/NamingThreadFactory.java    |  19 +-
 .../java/org/apache/accumulo/core/util/Stat.java   |  75 ++++----
 .../core/conf/AccumuloConfigurationTest.java       | 112 +++++++++++-
 .../spi/scan/IdleRatioScanPrioritizerTest.java     |  61 +++++++
 .../core/spi/scan/SimpleScanDispatcherTest.java    |  62 +++++++
 .../accumulo/core/spi/scan/TestScanInfo.java       | 101 +++++++++++
 .../org/apache/accumulo/core/util/StatTest.java    |  40 ++--
 .../accumulo/server/conf/TableConfiguration.java   |  37 ++++
 .../accumulo/server/conf/ZooConfiguration.java     |  13 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  41 +++--
 .../tserver/TabletServerResourceManager.java       | 202 ++++++++++++++++++---
 .../accumulo/tserver/scan/NextBatchTask.java       |   7 +-
 .../accumulo/tserver/session/MultiScanSession.java |  22 ++-
 .../accumulo/tserver/session/ScanSession.java      | 162 +++++++++++++----
 .../apache/accumulo/tserver/session/Session.java   |   2 +-
 .../accumulo/tserver/session/SessionManager.java   |   8 +-
 .../{ScanSession.java => SingleScanSession.java}   |  29 ++-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   9 +-
 .../org/apache/accumulo/test/IMMLGBenchmark.java   |   2 +-
 .../test/performance/scan/CollectTabletStats.java  |   5 +-
 34 files changed, 1414 insertions(+), 254 deletions(-)
 copy proxy/src/main/cpp/proxy_constants.cpp => core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java (73%)
 copy server/base/src/main/java/org/apache/accumulo/server/replication/WorkAssigner.java => core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java (62%)
 create mode 100644 core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
 create mode 100644 core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
 copy server/monitor/src/main/java/org/apache/accumulo/monitor/rest/trace/TraceType.java => core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java (52%)
 create mode 100644 core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
 copy server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java => core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java (68%)
 create mode 100644 core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
 copy core/src/main/java/org/apache/accumulo/core/util/{ComparablePair.java => AccumuloUncaughtExceptionHandler.java} (66%)
 create mode 100644 core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
 create mode 100644 core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
 create mode 100644 core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
 copy server/tserver/src/main/java/org/apache/accumulo/tserver/session/{ScanSession.java => SingleScanSession.java} (80%)


[accumulo] 01/02: ACCUMULO-4074 Added support for multiple scan executors (#549)

Posted by kt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit aaa757e4372a22b17201c1bee96b243b620345d3
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Jul 3 17:08:40 2018 -0400

    ACCUMULO-4074 Added support for multiple scan executors (#549)
    
     * Created plugin API for prioritizing scans that avoids accessing internal
       types.  This should make user written comparators more stable over time.
     * Made it possible to configure a comparator for each scan executor. Was
       previously a single comparator type for all executors.
     * Made it possible to pass options for comparator creation
     * Made it possible to configure a per table dispatcher. This dispatcher
       chooses which scan executor to use.  Dispatcher are user written plugins.
     * Fixed comparator type casting. I don't think the comparators in #510 worked
       because the Runnables were never the type expected. Also, abstracted all of
       this away from a user who may write a comparator.
---
 core/pom.xml                                       |  19 ++
 .../accumulo/core/conf/AccumuloConfiguration.java  | 136 +++++++++
 .../accumulo/core/conf/DefaultConfiguration.java   |   5 +
 .../org/apache/accumulo/core/conf/Property.java    |  55 +++-
 .../accumulo/core/conf/SiteConfiguration.java      |  10 +
 .../core/spi/common/IteratorConfiguration.java     |  30 +-
 .../common/Stats.java}                             |  36 ++-
 .../core/spi/scan/IdleRatioScanPrioritizer.java    |  51 ++++
 .../accumulo/core/spi/scan/ScanDispatcher.java     |  52 ++++
 .../accumulo/core/spi/scan/ScanExecutor.java       |  60 ++++
 .../apache/accumulo/core/spi/scan/ScanInfo.java    | 116 ++++++++
 .../accumulo/core/spi/scan/ScanPrioritizer.java    |  22 +-
 .../core/spi/scan/SimpleScanDispatcher.java        |  74 +++++
 .../util/AccumuloUncaughtExceptionHandler.java     |   1 -
 .../accumulo/core/util/NamingThreadFactory.java    |  16 +-
 .../java/org/apache/accumulo/core/util/Stat.java   |  75 ++---
 .../core/conf/AccumuloConfigurationTest.java       | 112 +++++++-
 .../spi/scan/IdleRatioScanPrioritizerTest.java     |  61 +++++
 .../core/spi/scan/SimpleScanDispatcherTest.java    |  62 +++++
 .../accumulo/core/spi/scan/TestScanInfo.java       | 101 +++++++
 .../org/apache/accumulo/core/util/StatTest.java    |  40 ++-
 .../accumulo/server/conf/TableConfiguration.java   |  37 +++
 .../accumulo/server/conf/ZooConfiguration.java     |  13 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  41 ++-
 .../tserver/TabletServerResourceManager.java       | 304 +++++++++++----------
 .../accumulo/tserver/scan/NextBatchTask.java       |   7 +-
 .../tserver/session/DefaultSessionComparator.java  |  67 -----
 .../accumulo/tserver/session/MultiScanSession.java |  32 +--
 .../accumulo/tserver/session/ScanSession.java      | 162 ++++++++---
 .../apache/accumulo/tserver/session/Session.java   |  18 +-
 .../accumulo/tserver/session/SessionManager.java   |  11 +-
 .../{ScanSession.java => SingleScanSession.java}   |  29 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   3 +-
 .../tserver/session/SessionComparatorTest.java     | 170 ------------
 .../org/apache/accumulo/test/IMMLGBenchmark.java   |   2 +-
 .../test/performance/scan/CollectTabletStats.java  |   5 +-
 36 files changed, 1411 insertions(+), 624 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index cf45283..4535f50 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -212,6 +212,25 @@
               </allows>
             </configuration>
           </execution>
+          <execution>
+            <id>apilyzer-spi</id>
+            <goals>
+              <goal>analyze</goal>
+            </goals>
+            <configuration>
+              <outputFile>${project.build.directory}/apilyzer-spi.txt</outputFile>
+              <includes>
+                <include>org[.]apache[.]accumulo[.]core[.]spi[.].*</include>
+              </includes>
+              <excludes />
+              <allows>
+                <allow>org[.]apache[.]hadoop[.]io[.]Text</allow>
+                <allow>org[.]apache[.]accumulo[.]core[.]client(?!.*[.](impl|thrift)[.].*)[.].*</allow>
+                <allow>org[.]apache[.]accumulo[.]core[.]data(?!.*[.](impl|thrift)[.].*)[.].*</allow>
+                <allow>org[.]apache[.]accumulo[.]core[.]security(?!.*[.](crypto)[.].*)[.].*</allow>
+              </allows>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
       <plugin>
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
index 5a9ce21..5427d79 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
@@ -16,12 +16,17 @@
  */
 package org.apache.accumulo.core.conf;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.TreeMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -29,10 +34,12 @@ import java.util.function.Predicate;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.PropertyType.PortRange;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
 import org.apache.accumulo.core.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 /**
@@ -348,6 +355,135 @@ public abstract class AccumuloConfiguration implements Iterable<Entry<String,Str
     return maxFilesPerTablet;
   }
 
+  public class ScanExecutorConfig {
+    public final String name;
+    public final int maxThreads;
+    public final OptionalInt priority;
+    public final Optional<String> prioritizerClass;
+    public final Map<String,String> prioritizerOpts;
+
+    public ScanExecutorConfig(String name, int maxThreads, OptionalInt priority,
+        Optional<String> comparatorFactory, Map<String,String> comparatorFactoryOpts) {
+      this.name = name;
+      this.maxThreads = maxThreads;
+      this.priority = priority;
+      this.prioritizerClass = comparatorFactory;
+      this.prioritizerOpts = comparatorFactoryOpts;
+    }
+
+    /**
+     * Re-reads the max threads from the configuration that created this class
+     */
+    public int getCurrentMaxThreads() {
+      Integer depThreads = getDeprecatedScanThreads(name);
+      if (depThreads != null) {
+        return depThreads;
+      }
+
+      String prop = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + name + "." + SCAN_EXEC_THREADS;
+      String val = getAllPropertiesWithPrefix(Property.TSERV_SCAN_EXECUTORS_PREFIX).get(prop);
+      return Integer.parseInt(val);
+    }
+  }
+
+  public boolean isPropertySet(Property prop) {
+    throw new UnsupportedOperationException();
+  }
+
+  @SuppressWarnings("deprecation")
+  Integer getDeprecatedScanThreads(String name) {
+
+    Property prop;
+    Property deprecatedProp;
+
+    if (name.equals(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME)) {
+      prop = Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS;
+      deprecatedProp = Property.TSERV_READ_AHEAD_MAXCONCURRENT;
+    } else if (name.equals("meta")) {
+      prop = Property.TSERV_SCAN_EXECUTORS_META_THREADS;
+      deprecatedProp = Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT;
+    } else {
+      return null;
+    }
+
+    if (!isPropertySet(prop) && isPropertySet(deprecatedProp)) {
+      log.warn("Property {} is deprecated, use {} instead.", prop.getKey(),
+          deprecatedProp.getKey());
+      return Integer.valueOf(get(deprecatedProp));
+    } else if (isPropertySet(prop) && isPropertySet(deprecatedProp)) {
+      log.warn("Deprecated property {} ignored because {} is set", deprecatedProp.getKey(),
+          prop.getKey());
+    }
+
+    return null;
+  }
+
+  private static final String SCAN_EXEC_THREADS = "threads";
+  private static final String SCAN_EXEC_PRIORITY = "priority";
+  private static final String SCAN_EXEC_PRIORITIZER = "prioritizer";
+  private static final String SCAN_EXEC_PRIORITIZER_OPTS = "prioritizer.opts.";
+
+  public Collection<ScanExecutorConfig> getScanExecutors() {
+
+    Map<String,Map<String,String>> propsByName = new HashMap<>();
+
+    List<ScanExecutorConfig> scanResources = new ArrayList<>();
+
+    for (Entry<String,String> entry : getAllPropertiesWithPrefix(
+        Property.TSERV_SCAN_EXECUTORS_PREFIX).entrySet()) {
+
+      String suffix = entry.getKey()
+          .substring(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey().length());
+      String[] tokens = suffix.split("\\.", 2);
+      String name = tokens[0];
+
+      propsByName.computeIfAbsent(name, k -> new HashMap<>()).put(tokens[1], entry.getValue());
+    }
+
+    for (Entry<String,Map<String,String>> entry : propsByName.entrySet()) {
+      String name = entry.getKey();
+      Integer threads = null;
+      Integer prio = null;
+      String prioritizerClass = null;
+      Map<String,String> prioritizerOpts = new HashMap<>();
+
+      for (Entry<String,String> subEntry : entry.getValue().entrySet()) {
+        String opt = subEntry.getKey();
+        String val = subEntry.getValue();
+
+        if (opt.equals(SCAN_EXEC_THREADS)) {
+          Integer depThreads = getDeprecatedScanThreads(name);
+          if (depThreads == null) {
+            threads = Integer.parseInt(val);
+          } else {
+            threads = depThreads;
+          }
+        } else if (opt.equals(SCAN_EXEC_PRIORITY)) {
+          prio = Integer.parseInt(val);
+        } else if (opt.equals(SCAN_EXEC_PRIORITIZER)) {
+          prioritizerClass = val;
+        } else if (opt.startsWith(SCAN_EXEC_PRIORITIZER_OPTS)) {
+          String key = opt.substring(SCAN_EXEC_PRIORITIZER_OPTS.length());
+          if (key.isEmpty()) {
+            throw new IllegalStateException("Invalid scan executor option : " + opt);
+          }
+          prioritizerOpts.put(key, val);
+        } else {
+          throw new IllegalStateException("Unkown scan executor option : " + opt);
+        }
+      }
+
+      Preconditions.checkArgument(threads != null && threads > 0,
+          "Scan resource %s incorrectly specified threads", name);
+
+      scanResources.add(new ScanExecutorConfig(name, threads,
+          prio == null ? OptionalInt.empty() : OptionalInt.of(prio),
+          Optional.ofNullable(prioritizerClass), prioritizerOpts));
+    }
+
+    return scanResources;
+  }
+
   /**
    * Invalidates the <code>ZooCache</code> used for storage and quick retrieval of properties for
    * this configuration.
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
index fc2891e..54b87cc 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
@@ -52,4 +52,9 @@ public class DefaultConfiguration extends AccumuloConfiguration {
     resolvedProps.entrySet().stream().filter(p -> filter.test(p.getKey()))
         .forEach(e -> props.put(e.getKey(), e.getValue()));
   }
+
+  @Override
+  public boolean isPropertySet(Property prop) {
+    return false;
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 98402ef..b7ff210 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -30,6 +30,9 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
 import org.apache.accumulo.core.util.interpret.DefaultScanInterpreter;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
@@ -430,14 +433,37 @@ public enum Property {
       "When a tablet server's SimpleTimer thread triggers to check idle"
           + " sessions, this configurable option will be used to evaluate update"
           + " sessions to determine if they can be closed due to inactivity"),
+  @Deprecated
   TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16", PropertyType.COUNT,
-      "The maximum number of concurrent read ahead that will execute. This effectively"
-          + " limits the number of long running scans that can run concurrently per tserver."),
-  TSERV_READ_AHEAD_PREFIX("tserver.readahead.concurrent.table.", null, PropertyType.PREFIX,
-      "Properties in this category allow overriding of table specific read ahead pools"),
+      "This property is deprecated since 2.0.0, use tserver.scan.executors.default.threads "
+          + "instead. The maximum number of concurrent read ahead that will execute. This "
+          + "effectively limits the number of long running scans that can run concurrently "
+          + "per tserver.\""),
+  @Deprecated
   TSERV_METADATA_READ_AHEAD_MAXCONCURRENT("tserver.metadata.readahead.concurrent.max", "8",
       PropertyType.COUNT,
-      "The maximum number of concurrent metadata read ahead that will execute."),
+      "This property is deprecated since 2.0.0, use tserver.scan.executors.meta.threads instead. "
+          + "The maximum number of concurrent metadata read ahead that will execute."),
+  TSERV_SCAN_EXECUTORS_PREFIX("tserver.scan.executors.", null, PropertyType.PREFIX,
+      "Prefix for defining executors to service scans.  For each executor the number of threads, "
+          + "thread priority, and an optional prioritizer can be configured.  The prioritizer "
+          + "determines which scan an executor should run first and must implement "
+          + ScanPrioritizer.class.getName() + ". Tables can select an executor by setting"
+          + " table.scan.dispatcher. To configure a new executor, set "
+          + "tserver.scan.executors.<name>.threads=<number>.  Optionally, can also set "
+          + "tserver.scan.executors.<name>.priority=<number 1 to 10>, "
+          + "tserver.scan.executors.<name>.prioritizer=<class name>, and "
+          + "tserver.scan.executors.<name>.prioritizer.opts.<key>=<value>"),
+  TSERV_SCAN_EXECUTORS_DEFAULT_THREADS("tserver.scan.executors.default.threads", "16",
+      PropertyType.COUNT,
+      "The number of threads for the scan executor that tables use by default."),
+  TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER("tserver.scan.executors.default.prioritizer", "",
+      PropertyType.STRING,
+      "Prioritizer for the default scan executor.  Defaults to none which "
+          + "results in FIFO priority.  Set to a class that implements "
+          + ScanPrioritizer.class.getName() + " to configure one."),
+  TSERV_SCAN_EXECUTORS_META_THREADS("tserver.scan.executors.meta.threads", "8", PropertyType.COUNT,
+      "The number of threads for the metadata table scan executor."),
   TSERV_MIGRATE_MAXCONCURRENT("tserver.migrations.concurrent.max", "1", PropertyType.COUNT,
       "The maximum number of concurrent tablet migrations for a tablet server"),
   TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", PropertyType.COUNT,
@@ -541,9 +567,6 @@ public enum Property {
   TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", PropertyType.COUNT,
       "The number of threads on each tablet server available to retrieve"
           + " summary data, that is not currently in cache, from RFiles."),
-  TSERV_SESSION_COMPARATOR_CLASS("tserver.summary.comparator.class", "", PropertyType.CLASSNAME,
-      "A customizable Scan session comparator. Note that by default, the value is empty"
-          + " and thus uses no session comparator"),
 
   // accumulo garbage collector properties
   GC_PREFIX("gc.", null, PropertyType.PREFIX,
@@ -669,6 +692,19 @@ public enum Property {
       PropertyType.BYTES,
       "The max RFile size used for a merging minor compaction. The default"
           + " value of 0 disables a max file size."),
+  TABLE_SCAN_DISPATCHER("table.scan.dispatcher", SimpleScanDispatcher.class.getName(),
+      PropertyType.CLASSNAME,
+      "This class is used to dynamically dispatch scans to configured scan executors.  This setting"
+          + " defaults to " + SimpleScanDispatcher.class.getSimpleName()
+          + " which dispatches to an executor"
+          + " named 'default' when it is optionless. Setting the option "
+          + "'table.scan.dispatcher.opts.executor=<name>' causes "
+          + SimpleScanDispatcher.class.getSimpleName() + " to dispatch to the specified executor. "
+          + "It has more options listed in its javadoc. Configured classes must implement "
+          + ScanDispatcher.class.getName() + ".  This property is ignored for the root and metadata"
+          + " table.  The metadata table always dispatches to a scan executor named `meta`."),
+  TABLE_SCAN_DISPATCHER_OPTS("table.scan.dispatcher.opts.", null, PropertyType.PREFIX,
+      "Options for the table scan dispatcher"),
   TABLE_SCAN_MAXMEM("table.scan.max.memory", "512K", PropertyType.BYTES,
       "The maximum amount of memory that will be used to cache results of a client query/scan. "
           + "Once this limit is reached, the buffered data is sent to the client."),
@@ -1153,7 +1189,8 @@ public enum Property {
             || key.startsWith(Property.TABLE_REPLICATION_TARGET.getKey())
             || key.startsWith(Property.TABLE_ARBITRARY_PROP_PREFIX.getKey())
             || key.startsWith(TABLE_SAMPLER_OPTS.getKey())
-            || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey())));
+            || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey())
+            || key.startsWith(TABLE_SCAN_DISPATCHER_OPTS.getKey())));
   }
 
   /**
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
index 001b1db..cf0fe25 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * An {@link AccumuloConfiguration} which first loads any properties set on the command-line (using
  * the -o option) and then from an XML file, usually accumulo-site.xml. This implementation supports
@@ -165,6 +167,14 @@ public class SiteConfiguration extends AccumuloConfiguration {
   }
 
   @Override
+  public boolean isPropertySet(Property prop) {
+    Preconditions.checkArgument(!prop.isSensitive(),
+        "This method not implemented for sensitive props");
+    return CliConfiguration.get(prop) != null || staticConfigs.containsKey(prop.getKey())
+        || getXmlConfig().get(prop.getKey()) != null || parent.isPropertySet(prop);
+  }
+
+  @Override
   public void getProperties(Map<String,String> props, Predicate<String> filter) {
     getProperties(props, filter, true);
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
similarity index 58%
rename from server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
rename to core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
index 28e6ef2..40f05e5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
@@ -14,23 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.tserver.session;
+package org.apache.accumulo.core.spi.common;
 
-public class SingleRangePriorityComparator extends DefaultSessionComparator {
+import java.util.Map;
 
-  @Override
-  public int compareSession(Session sessionA, Session sessionB) {
-    int priority = super.compareSession(sessionA, sessionB);
+/**
+ * Provides information about a configured Accumulo Iterator
+ *
+ * @since 2.0.0
+ */
+public interface IteratorConfiguration {
+  String getIteratorClass();
+
+  String getName();
+
+  int getPriority();
 
-    if (sessionA instanceof MultiScanSession && sessionB instanceof ScanSession) {
-      if (priority < 0) {
-        priority *= -1;
-      }
-    } else if (sessionB instanceof MultiScanSession && sessionA instanceof ScanSession) {
-      if (priority > 0) {
-        priority *= -1;
-      }
-    }
-    return priority;
-  }
+  Map<String,String> getOptions();
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
similarity index 58%
copy from core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
copy to core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
index f688010..a1b9322 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
@@ -14,21 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.core.util;
+package org.apache.accumulo.core.spi.common;
 
-import java.lang.Thread.UncaughtExceptionHandler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * @since 2.0.0
+ */
+public interface Stats {
 
-public class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandler {
+  /**
+   * @return the minimum data point seen, or 0 if no data was seen
+   */
+  long min();
 
-  private static final Logger log = LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
+  /**
+   * @return the maximum data point seen, or 0 if no data was seen
+   */
+  long max();
 
-  @Override
-  public void uncaughtException(Thread t, Throwable e) {
+  /**
+   * @return the mean of the data points seen, or {@link Double#NaN} if no data was seen
+   */
+  double mean();
 
-    log.error(String.format("Caught an exception in %s.  Shutting down.", t), e);
-  }
+  /**
+   * @return the sum of the data points seen, or 0 if no data was seen
+   */
+  long sum();
 
+  /**
+   * @return the number of data points seen
+   */
+  long num();
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
new file mode 100644
index 0000000..c567460
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Comparator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Prioritize scans based on the ratio of runTime/idleTime. Scans with a lower ratio have a higher
+ * priority. When the ratio is equal, the scan with the oldest last run time has the highest
+ * priority. If neither have run, then the oldest gets priority.
+ *
+ * @since 2.0.0
+ */
+public class IdleRatioScanPrioritizer implements ScanPrioritizer {
+  private static double idleRatio(long currTime, ScanInfo si) {
+    double totalRunTime = si.getRunTimeStats().sum();
+    double totalIdleTime = Math.max(1, si.getIdleTimeStats(currTime).sum());
+    return totalRunTime / totalIdleTime;
+  }
+
+  @Override
+  public Comparator<ScanInfo> createComparator(Map<String,String> options) {
+    Preconditions.checkArgument(options.isEmpty());
+
+    Comparator<ScanInfo> c1 = (si1, si2) -> {
+      long currTime = System.currentTimeMillis();
+      return Double.compare(idleRatio(currTime, si1), idleRatio(currTime, si2));
+    };
+
+    return c1.thenComparingLong(si -> si.getLastRunTime().orElse(0))
+        .thenComparingLong(si -> si.getCreationTime());
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
new file mode 100644
index 0000000..a3bc3f1
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A per table scan dispatcher that decides which executor should be used to processes a scan. For
+ * information about configuring, find the documentation for the {@code table.scan.dispatcher} and
+ * {@code table.scan.dispatcher.opts.} properties.
+ *
+ * @since 2.0.0
+ */
+public interface ScanDispatcher {
+  /**
+   * This method is called once after a ScanDispatcher is instantiated.
+   *
+   * @param options
+   *          The configured options. For example if the table properties
+   *          {@code table.scan.dispatcher.opts.p1=abc} and
+   *          {@code table.scan.dispatcher.opts.p9=123} were set, then this map would contain
+   *          {@code p1=abc} and {@code p9=123}.
+   */
+  public default void init(Map<String,String> options) {
+    Preconditions.checkArgument(options.isEmpty(), "No options expected");
+  }
+
+  /**
+   * @param scanInfo
+   *          Information about the scan.
+   * @param scanExecutors
+   *          Information about the currently configured executors.
+   * @return Should return one of the executors named in scanExecutors.keySet()
+   */
+  String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors);
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
new file mode 100644
index 0000000..a55b090
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for obtaining information about a scan executor
+ *
+ * @since 2.0.0
+ */
+public interface ScanExecutor {
+
+  interface Config {
+    /**
+     * @return the unique name used to identified executor in config
+     */
+    String getName();
+
+    /**
+     * @return the max number of threads that were configured
+     */
+    int getMaxThreads();
+
+    /**
+     * @return the prioritizer that was configured
+     */
+    Optional<String> getPrioritizerClass();
+
+    /**
+     * @return the prioritizer options
+     */
+    Map<String,String> getPrioritizerOptions();
+  }
+
+  /**
+   * @return The number of task queued for the executor
+   */
+  int getQueued();
+
+  /**
+   * @return The configuration used to create the executor
+   */
+  Config getConfig();
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
new file mode 100644
index 0000000..7961c21
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collection;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+
+/**
+ * Provides information about an active Accumulo scan against a tablet. Accumulo scans operate by
+ * repeatedly gathering batches of data and returning those to the client.
+ *
+ * <p>
+ * All times are in milliseconds and obtained using System.currentTimeMillis().
+ *
+ * @since 2.0.0
+ */
+public interface ScanInfo {
+
+  enum Type {
+    /**
+     * A single range scan started using a {@link Scanner}
+     */
+    SINGLE,
+    /**
+     * A multi range scan started using a {@link BatchScanner}
+     */
+    MULTI
+  }
+
+  Type getScanType();
+
+  String getTableId();
+
+  /**
+   * Returns the first time a tablet knew about a scan over its portion of data.
+   */
+  long getCreationTime();
+
+  /**
+   * If the scan has run, returns the last run time.
+   */
+  OptionalLong getLastRunTime();
+
+  /**
+   * Returns timing statistics about running and gathering a batches of data.
+   */
+  Stats getRunTimeStats();
+
+  /**
+   * Returns statistics about the time between running. These stats are only about the idle times
+   * before the last run time. The idle time after the last run time are not included. If the scan
+   * has never run, then there are no stats.
+   */
+  Stats getIdleTimeStats();
+
+  /**
+   * This method is similar to {@link #getIdleTimeStats()}, but it also includes the time period
+   * between the last run time and now in the stats. If the scan has never run, then the stats are
+   * computed using only {@code currentTime - creationTime}.
+   */
+  Stats getIdleTimeStats(long currentTime);
+
+  /**
+   * This method returns what column were fetched by a scan. When a family is fetched, a Column
+   * object where everything but the family is null is in the set.
+   *
+   * <p>
+   * The following example code shows how this method can be used to check if a family was fetched
+   * or a family+qualifier was fetched. If continually checking for the same column, should probably
+   * create a constant.
+   *
+   * <pre>
+   * <code>
+   *   boolean wasFamilyFetched(ScanInfo si, byte[] fam) {
+   *     Column family = new Column(fam, null, null);
+   *     return si.getFetchedColumns().contains(family);
+   *   }
+   *
+   *   boolean wasColumnFetched(ScanInfo si, byte[] fam, byte[] qual) {
+   *     Column col = new Column(fam, qual, null);
+   *     return si.getFetchedColumns().contains(col);
+   *   }
+   * </code>
+   * </pre>
+   *
+   *
+   * @return The family and family+qualifier pairs fetched.
+   */
+  Set<Column> getFetchedColumns();
+
+  /**
+   * @return iterators that where configured on the client side scanner
+   */
+  Collection<IteratorConfiguration> getClientScanIterators();
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
similarity index 65%
rename from server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
rename to core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
index dcfa1d4..51a4254 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
@@ -14,19 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.tserver.session;
+package org.apache.accumulo.core.spi.scan;
 
 import java.util.Comparator;
+import java.util.Map;
 
-public abstract class SessionComparator implements Comparator<Runnable> {
-
-  @Override
-  public int compare(Runnable sessionA, Runnable sessionB) {
-    if (sessionA instanceof Session && sessionB instanceof Session)
-      return compareSession((Session) sessionA, (Session) sessionB);
-    else
-      return 0;
-  }
-
-  public abstract int compareSession(final Session sessionA, final Session sessionB);
+/**
+ * A factory for creating comparators used for prioritizing scans. For information about
+ * configuring, find the documentation for the {@code tserver.scan.executors.} property.
+ *
+ * @since 2.0.0
+ */
+public interface ScanPrioritizer {
+  Comparator<ScanInfo> createComparator(Map<String,String> options);
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
new file mode 100644
index 0000000..96e7d2c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * If no options are given, then this will dispatch to an executor named {@code default}. This
+ * dispatcher supports the following options.
+ *
+ * <UL>
+ * <LI>{@code table.scan.dispatcher.opts.executor=<scan executor name>} : dispatches all scans to
+ * the named executor.</LI>
+ * <LI>{@code table.scan.dispatcher.opts.multi_executor=<scan executor name>} : dispatches batch
+ * scans to the named executor.</LI>
+ * <LI>{@code table.scan.dispatcher.opts.single_executor=<scan executor name>} : dispatches regular
+ * scans to the named executor.</LI>
+ * </UL>
+ *
+ * The {@code multi_executor} and {@code single_executor} options override the {@code executor}
+ * option.
+ */
+
+public class SimpleScanDispatcher implements ScanDispatcher {
+
+  private final Set<String> VALID_OPTS = ImmutableSet.of("executor", "multi_executor",
+      "single_executor");
+  private String multiExecutor;
+  private String singleExecutor;
+
+  public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
+
+  @Override
+  public void init(Map<String,String> options) {
+    Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS);
+    Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : %s", invalidOpts);
+
+    String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME);
+    multiExecutor = options.getOrDefault("multi_executor", base);
+    singleExecutor = options.getOrDefault("single_executor", base);
+  }
+
+  @Override
+  public String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors) {
+    switch (scanInfo.getScanType()) {
+      case MULTI:
+        return multiExecutor;
+      case SINGLE:
+        return singleExecutor;
+      default:
+        throw new IllegalArgumentException("Unexpected scan type " + scanInfo.getScanType());
+
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
index f688010..ed9c150 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
@@ -27,7 +27,6 @@ public class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandle
 
   @Override
   public void uncaughtException(Thread t, Throwable e) {
-
     log.error(String.format("Caught an exception in %s.  Shutting down.", t), e);
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
index 0b4730c..57d429b 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.core.util;
 
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.OptionalInt;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -26,20 +28,30 @@ import org.slf4j.LoggerFactory;
 public class NamingThreadFactory implements ThreadFactory {
   private static final Logger log = LoggerFactory.getLogger(NamingThreadFactory.class);
 
-  private static final AccumuloUncaughtExceptionHandler uncaughtHandler = new AccumuloUncaughtExceptionHandler();
+  private static final UncaughtExceptionHandler UEH = new AccumuloUncaughtExceptionHandler();
 
   private AtomicInteger threadNum = new AtomicInteger(1);
   private String name;
+  private OptionalInt priority;
 
   public NamingThreadFactory(String name) {
     this.name = name;
+    this.priority = OptionalInt.empty();
+  }
+
+  public NamingThreadFactory(String name, OptionalInt priority) {
+    this.name = name;
+    this.priority = priority;
   }
 
   @Override
   public Thread newThread(Runnable r) {
     Thread thread = new Daemon(new LoggingRunnable(log, r),
         name + " " + threadNum.getAndIncrement());
-    thread.setUncaughtExceptionHandler(uncaughtHandler);
+    thread.setUncaughtExceptionHandler(UEH);
+    if (priority.isPresent()) {
+      thread.setPriority(priority.getAsInt());
+    }
     return thread;
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Stat.java b/core/src/main/java/org/apache/accumulo/core/util/Stat.java
index 8bdeb63..704622b 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Stat.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Stat.java
@@ -16,67 +16,70 @@
  */
 package org.apache.accumulo.core.util;
 
-import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.accumulo.core.spi.common.Stats;
 import org.apache.commons.math3.stat.descriptive.moment.Mean;
-import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
-import org.apache.commons.math3.stat.descriptive.rank.Max;
-import org.apache.commons.math3.stat.descriptive.rank.Min;
-import org.apache.commons.math3.stat.descriptive.summary.Sum;
 
-public class Stat {
-  Min min;
-  Max max;
-  Sum sum;
+public class Stat implements Stats {
+  long min;
+  long max;
+  long sum;
   Mean mean;
-  StandardDeviation sd;
-
-  StorelessUnivariateStatistic[] stats;
 
   public Stat() {
-    min = new Min();
-    max = new Max();
-    sum = new Sum();
     mean = new Mean();
-    sd = new StandardDeviation();
-
-    stats = new StorelessUnivariateStatistic[] {min, max, sum, mean, sd};
+    clear();
   }
 
   public void addStat(long stat) {
-    for (StorelessUnivariateStatistic statistic : stats) {
-      statistic.increment(stat);
-    }
+    min = Math.min(min, stat);
+    max = Math.max(max, stat);
+    sum += stat;
+    mean.increment(stat);
   }
 
-  public long getMin() {
-    return (long) min.getResult();
+  @Override
+  public long min() {
+    return num() == 0 ? 0L : min;
   }
 
-  public long getMax() {
-    return (long) max.getResult();
+  @Override
+  public long max() {
+    return num() == 0 ? 0L : max;
   }
 
-  public long getSum() {
-    return (long) sum.getResult();
+  @Override
+  public long sum() {
+    return sum;
   }
 
-  public double getAverage() {
+  @Override
+  public double mean() {
     return mean.getResult();
   }
 
-  public double getStdDev() {
-    return sd.getResult();
-  }
-
   @Override
   public String toString() {
-    return String.format("%,d %,d %,.2f %,d", getMin(), getMax(), getAverage(), mean.getN());
+    return String.format("%,d %,d %,.2f %,d", min(), max(), mean(), mean.getN());
   }
 
   public void clear() {
-    for (StorelessUnivariateStatistic statistic : stats) {
-      statistic.clear();
-    }
+    min = Long.MAX_VALUE;
+    max = Long.MIN_VALUE;
+    sum = 0;
+    mean.clear();
+  }
+
+  @Override
+  public long num() {
+    return mean.getN();
   }
 
+  public Stat copy() {
+    Stat stat = new Stat();
+    stat.min = this.min;
+    stat.max = this.max;
+    stat.sum = this.sum;
+    stat.mean = this.mean.copy();
+    return stat;
+  }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
index 9f77834..0fcfebd 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
@@ -21,11 +21,15 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.function.Predicate;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -131,6 +135,15 @@ public class AccumuloConfigurationTest {
 
     private HashMap<String,String> props = new HashMap<>();
     private int upCount = 0;
+    private AccumuloConfiguration parent;
+
+    TestConfiguration() {
+      parent = null;
+    }
+
+    TestConfiguration(AccumuloConfiguration parent) {
+      this.parent = parent;
+    }
 
     public void set(String p, String v) {
       props.put(p, v);
@@ -138,17 +151,29 @@ public class AccumuloConfigurationTest {
     }
 
     @Override
+    public boolean isPropertySet(Property prop) {
+      return props.containsKey(prop.getKey());
+    }
+
+    @Override
     public long getUpdateCount() {
       return upCount;
     }
 
     @Override
     public String get(Property property) {
-      return props.get(property.getKey());
+      String v = props.get(property.getKey());
+      if (v == null & parent != null) {
+        v = parent.get(property);
+      }
+      return v;
     }
 
     @Override
     public void getProperties(Map<String,String> output, Predicate<String> filter) {
+      if (parent != null) {
+        parent.getProperties(output, filter);
+      }
       for (Entry<String,String> entry : props.entrySet()) {
         if (filter.test(entry.getKey())) {
           output.put(entry.getKey(), entry.getValue());
@@ -267,4 +292,89 @@ public class AccumuloConfigurationTest {
     Map<String,String> pmL = tc.getAllPropertiesWithPrefix(Property.TABLE_ITERATOR_SCAN_PREFIX);
     assertSame(pmG, pmL);
   }
+
+  @Test
+  public void testScanExecutors() {
+    String defName = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+    TestConfiguration tc = new TestConfiguration(DefaultConfiguration.getInstance());
+
+    Collection<ScanExecutorConfig> executors = tc.getScanExecutors();
+
+    Assert.assertEquals(2, executors.size());
+
+    ScanExecutorConfig sec = executors.stream().filter(c -> c.name.equals(defName)).findFirst()
+        .get();
+    Assert.assertEquals(
+        Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+        sec.maxThreads);
+    Assert.assertFalse(sec.priority.isPresent());
+    Assert.assertTrue(sec.prioritizerClass.get().isEmpty());
+    Assert.assertTrue(sec.prioritizerOpts.isEmpty());
+
+    // ensure deprecated props is read if nothing else is set
+    tc.set("tserver.readahead.concurrent.max", "6");
+    Assert.assertEquals(6, sec.getCurrentMaxThreads());
+    Assert.assertEquals(
+        Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+        sec.maxThreads);
+    ScanExecutorConfig sec2 = tc.getScanExecutors().stream().filter(c -> c.name.equals(defName))
+        .findFirst().get();
+    Assert.assertEquals(6, sec2.maxThreads);
+
+    // ensure new prop overrides deperecated prop
+    tc.set(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey(), "9");
+    Assert.assertEquals(9, sec.getCurrentMaxThreads());
+    Assert.assertEquals(
+        Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+        sec.maxThreads);
+    ScanExecutorConfig sec3 = tc.getScanExecutors().stream().filter(c -> c.name.equals(defName))
+        .findFirst().get();
+    Assert.assertEquals(9, sec3.maxThreads);
+
+    ScanExecutorConfig sec4 = executors.stream().filter(c -> c.name.equals("meta")).findFirst()
+        .get();
+    Assert.assertEquals(
+        Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getDefaultValue()),
+        sec4.maxThreads);
+    Assert.assertFalse(sec4.priority.isPresent());
+    Assert.assertFalse(sec4.prioritizerClass.isPresent());
+    Assert.assertTrue(sec4.prioritizerOpts.isEmpty());
+
+    tc.set("tserver.metadata.readahead.concurrent.max", "2");
+    Assert.assertEquals(2, sec4.getCurrentMaxThreads());
+    ScanExecutorConfig sec5 = tc.getScanExecutors().stream().filter(c -> c.name.equals("meta"))
+        .findFirst().get();
+    Assert.assertEquals(2, sec5.maxThreads);
+
+    tc.set(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getKey(), "3");
+    Assert.assertEquals(3, sec4.getCurrentMaxThreads());
+    ScanExecutorConfig sec6 = tc.getScanExecutors().stream().filter(c -> c.name.equals("meta"))
+        .findFirst().get();
+    Assert.assertEquals(3, sec6.maxThreads);
+
+    String prefix = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey();
+    tc.set(prefix + "hulksmash.threads", "66");
+    tc.set(prefix + "hulksmash.priority", "3");
+    tc.set(prefix + "hulksmash.prioritizer", "com.foo.ScanPrioritizer");
+    tc.set(prefix + "hulksmash.prioritizer.opts.k1", "v1");
+    tc.set(prefix + "hulksmash.prioritizer.opts.k2", "v3");
+
+    executors = tc.getScanExecutors();
+    Assert.assertEquals(3, executors.size());
+    ScanExecutorConfig sec7 = executors.stream().filter(c -> c.name.equals("hulksmash")).findFirst()
+        .get();
+    Assert.assertEquals(66, sec7.maxThreads);
+    Assert.assertEquals(3, sec7.priority.getAsInt());
+    Assert.assertEquals("com.foo.ScanPrioritizer", sec7.prioritizerClass.get());
+    Assert.assertEquals(ImmutableMap.of("k1", "v1", "k2", "v3"), sec7.prioritizerOpts);
+
+    tc.set(prefix + "hulksmash.threads", "44");
+    Assert.assertEquals(66, sec7.maxThreads);
+    Assert.assertEquals(44, sec7.getCurrentMaxThreads());
+
+    ScanExecutorConfig sec8 = tc.getScanExecutors().stream().filter(c -> c.name.equals("hulksmash"))
+        .findFirst().get();
+    Assert.assertEquals(44, sec8.maxThreads);
+  }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
new file mode 100644
index 0000000..0128b20
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IdleRatioScanPrioritizerTest {
+
+  @Test
+  public void testSort() {
+    long now = System.currentTimeMillis();
+
+    List<TestScanInfo> scans = new ArrayList<>();
+
+    // Two following have never run, so oldest should go first
+    scans.add(new TestScanInfo("a", Type.SINGLE, now - 3));
+    scans.add(new TestScanInfo("b", Type.SINGLE, now - 8));
+    // Two following have different idle ratio and same last run times
+    scans.add(new TestScanInfo("c", Type.SINGLE, now - 16, 2, 10));
+    scans.add(new TestScanInfo("d", Type.SINGLE, now - 16, 5, 10));
+    // Two following have same idle ratio and different last run times
+    scans.add(new TestScanInfo("e", Type.SINGLE, now - 12, 5, 9));
+    scans.add(new TestScanInfo("f", Type.SINGLE, now - 12, 3, 7));
+
+    Collections.shuffle(scans);
+
+    Comparator<ScanInfo> comparator = new IdleRatioScanPrioritizer()
+        .createComparator(Collections.emptyMap());
+
+    Collections.sort(scans, comparator);
+
+    Assert.assertEquals("b", scans.get(0).testId);
+    Assert.assertEquals("a", scans.get(1).testId);
+    Assert.assertEquals("f", scans.get(2).testId);
+    Assert.assertEquals("e", scans.get(3).testId);
+    Assert.assertEquals("d", scans.get(4).testId);
+    Assert.assertEquals("c", scans.get(5).testId);
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
new file mode 100644
index 0000000..b59858a
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SimpleScanDispatcherTest {
+  @Test
+  public void testProps() {
+    Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey()
+        .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".threads"));
+    Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER.getKey()
+        .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".prioritizer"));
+  }
+
+  private void runTest(Map<String,String> opts, String expectedSingle, String expectedMulti) {
+    ScanInfo msi = new TestScanInfo("a", Type.MULTI, 4);
+    ScanInfo ssi = new TestScanInfo("a", Type.SINGLE, 4);
+
+    SimpleScanDispatcher ssd1 = new SimpleScanDispatcher();
+    ssd1.init(opts);
+    Assert.assertEquals(expectedMulti, ssd1.dispatch(msi, null));
+    Assert.assertEquals(expectedSingle, ssd1.dispatch(ssi, null));
+  }
+
+  @Test
+  public void testBasic() {
+    String dname = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+    runTest(Collections.emptyMap(), dname, dname);
+    runTest(ImmutableMap.of("executor", "E1"), "E1", "E1");
+    runTest(ImmutableMap.of("single_executor", "E2"), "E2", dname);
+    runTest(ImmutableMap.of("multi_executor", "E3"), dname, "E3");
+    runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2"), "E2", "E1");
+    runTest(ImmutableMap.of("executor", "E1", "multi_executor", "E3"), "E1", "E3");
+    runTest(ImmutableMap.of("single_executor", "E2", "multi_executor", "E3"), "E2", "E3");
+    runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2", "multi_executor", "E3"),
+        "E2", "E3");
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
new file mode 100644
index 0000000..68a9650
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collection;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+import org.apache.accumulo.core.util.Stat;
+
+public class TestScanInfo implements ScanInfo {
+
+  String testId;
+  Type scanType;
+  long creationTime;
+  OptionalLong lastRunTime = OptionalLong.empty();
+  Stat runTimeStats = new Stat();
+  Stat idleTimeStats = new Stat();
+
+  TestScanInfo(String testId, Type scanType, long creationTime, int... times) {
+    this.testId = testId;
+    this.scanType = scanType;
+    this.creationTime = creationTime;
+
+    for (int i = 0; i < times.length; i += 2) {
+      long idleDuration = times[i] - (i == 0 ? 0 : times[i - 1]);
+      long runDuration = times[i + 1] - times[i];
+      runTimeStats.addStat(runDuration);
+      idleTimeStats.addStat(idleDuration);
+    }
+
+    if (times.length > 0) {
+      lastRunTime = OptionalLong.of(times[times.length - 1] + creationTime);
+    }
+  }
+
+  @Override
+  public Type getScanType() {
+    return scanType;
+  }
+
+  @Override
+  public String getTableId() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  @Override
+  public OptionalLong getLastRunTime() {
+    return lastRunTime;
+  }
+
+  @Override
+  public Stats getRunTimeStats() {
+    return runTimeStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats() {
+    return idleTimeStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats(long currentTime) {
+    Stat copy = idleTimeStats.copy();
+    copy.addStat(currentTime - lastRunTime.orElse(creationTime));
+    return copy;
+  }
+
+  @Override
+  public Set<Column> getFetchedColumns() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<IteratorConfiguration> getClientScanIterators() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
index 69df38d..a2986ac 100644
--- a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
@@ -43,32 +43,26 @@ public class StatTest {
 
   @Test
   public void testGetMin() {
-    assertEquals(0, zero.getMin());
-    assertEquals(3677, stat.getMin());
+    assertEquals(0, zero.min());
+    assertEquals(3677, stat.min());
   }
 
   @Test
   public void testGetMax() {
-    assertEquals(0, zero.getMax());
-    assertEquals(9792, stat.getMax());
+    assertEquals(0, zero.max());
+    assertEquals(9792, stat.max());
   }
 
   @Test
   public void testGetAverage() {
-    assertEquals(0, zero.getAverage(), delta);
-    assertEquals(5529, stat.getAverage(), delta);
-  }
-
-  @Test
-  public void testGetStdDev() {
-    assertEquals(0, zero.getStdDev(), delta);
-    assertEquals(2073.7656569632, stat.getStdDev(), delta);
+    assertEquals(0, zero.mean(), delta);
+    assertEquals(5529, stat.mean(), delta);
   }
 
   @Test
   public void testGetSum() {
-    assertEquals(0, zero.getSum());
-    assertEquals(38703, stat.getSum());
+    assertEquals(0, zero.sum());
+    assertEquals(38703, stat.sum());
   }
 
   @Test
@@ -76,16 +70,14 @@ public class StatTest {
     zero.clear();
     stat.clear();
 
-    assertEquals(0, zero.getMax());
-    assertEquals(zero.getMax(), stat.getMax());
-    assertEquals(0, zero.getMin());
-    assertEquals(zero.getMin(), stat.getMin());
-    assertEquals(0, zero.getSum());
-    assertEquals(zero.getSum(), stat.getSum());
+    assertEquals(0, zero.max());
+    assertEquals(zero.max(), stat.max());
+    assertEquals(0, zero.min());
+    assertEquals(zero.min(), stat.min());
+    assertEquals(0, zero.sum());
+    assertEquals(zero.sum(), stat.sum());
 
-    assertEquals(Double.NaN, zero.getAverage(), 0);
-    assertEquals(zero.getAverage(), stat.getAverage(), 0);
-    assertEquals(Double.NaN, zero.getStdDev(), 0);
-    assertEquals(zero.getStdDev(), stat.getStdDev(), 0);
+    assertEquals(Double.NaN, zero.mean(), 0);
+    assertEquals(zero.mean(), stat.mean(), 0);
   }
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index dbb7ca4..cc67feb 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.conf;
 import static java.util.Objects.requireNonNull;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
@@ -216,4 +218,39 @@ public class TableConfiguration extends ObservableConfiguration {
 
     return pic;
   }
+
+  public static class TablesScanDispatcher {
+    public final ScanDispatcher dispatcher;
+    public final long count;
+
+    public TablesScanDispatcher(ScanDispatcher dispatcher, long count) {
+      this.dispatcher = dispatcher;
+      this.count = count;
+    }
+  }
+
+  private AtomicReference<TablesScanDispatcher> scanDispatcherRef = new AtomicReference<>();
+
+  public ScanDispatcher getScanDispatcher() {
+    long count = getUpdateCount();
+    TablesScanDispatcher currRef = scanDispatcherRef.get();
+    if (currRef == null || currRef.count != count) {
+      ScanDispatcher newDispatcher = Property.createTableInstanceFromPropertyName(this,
+          Property.TABLE_SCAN_DISPATCHER, ScanDispatcher.class, null);
+
+      Map<String,String> opts = new HashMap<>();
+      getAllPropertiesWithPrefix(Property.TABLE_SCAN_DISPATCHER_OPTS).forEach((k, v) -> {
+        String optKey = k.substring(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey().length());
+        opts.put(optKey, v);
+      });
+
+      newDispatcher.init(Collections.unmodifiableMap(opts));
+
+      TablesScanDispatcher newRef = new TablesScanDispatcher(newDispatcher, count);
+      scanDispatcherRef.compareAndSet(currRef, newRef);
+      currRef = newRef;
+    }
+
+    return currRef.dispatcher;
+  }
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
index 1973fd8..ab41141 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
@@ -81,11 +81,12 @@ public class ZooConfiguration extends AccumuloConfiguration {
   @Override
   public String get(Property property) {
     if (Property.isFixedZooPropertyKey(property)) {
-      if (fixedProps.containsKey(property.getKey())) {
-        return fixedProps.get(property.getKey());
+      String val = fixedProps.get(property.getKey());
+      if (val != null) {
+        return val;
       } else {
         synchronized (fixedProps) {
-          String val = _get(property);
+          val = _get(property);
           fixedProps.put(property.getKey(), val);
           return val;
         }
@@ -96,6 +97,12 @@ public class ZooConfiguration extends AccumuloConfiguration {
     }
   }
 
+  @Override
+  public boolean isPropertySet(Property prop) {
+    return fixedProps.containsKey(prop.getKey()) || getRaw(prop.getKey()) != null
+        || parent.isPropertySet(prop);
+  }
+
   private String getRaw(String key) {
     String zPath = propPathPrefix + "/" + key;
     byte[] v = propCache.get(zPath);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6e67cf0..14c0306 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -130,6 +130,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
 import org.apache.accumulo.core.summary.Gatherer;
 import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
 import org.apache.accumulo.core.summary.SummaryCollection;
@@ -250,9 +251,9 @@ import org.apache.accumulo.tserver.scan.NextBatchTask;
 import org.apache.accumulo.tserver.scan.ScanRunState;
 import org.apache.accumulo.tserver.session.ConditionalSession;
 import org.apache.accumulo.tserver.session.MultiScanSession;
-import org.apache.accumulo.tserver.session.ScanSession;
 import org.apache.accumulo.tserver.session.Session;
 import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SingleScanSession;
 import org.apache.accumulo.tserver.session.SummarySession;
 import org.apache.accumulo.tserver.session.UpdateSession;
 import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
@@ -543,6 +544,16 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       }
     }
 
+    private ScanDispatcher getScanDispatcher(KeyExtent extent) {
+      if (extent.isRootTablet() || extent.isMeta()) {
+        // dispatcher is only for user tables
+        return null;
+      }
+
+      return getServerConfigurationFactory().getTableConfiguration(extent.getTableId())
+          .getScanDispatcher();
+    }
+
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
         TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
@@ -587,13 +598,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       if (tablet == null)
         throw new NotServingTabletException(textent);
 
-      Set<Column> columnSet = new HashSet<>();
+      HashSet<Column> columnSet = new HashSet<>();
       for (TColumn tcolumn : columns) {
         columnSet.add(new Column(tcolumn));
       }
 
-      final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio,
-          new Authorizations(authorizations), readaheadThreshold, batchTimeOut, context);
+      final SingleScanSession scanSession = new SingleScanSession(credentials, extent, columnSet,
+          ssiList, ssio, new Authorizations(authorizations), readaheadThreshold, batchTimeOut,
+          context);
       scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet,
           scanSession.auths, ssiList, ssio, isolated, scanSession.interruptFlag,
           SamplerConfigurationImpl.fromThrift(tSamplerConfig), scanSession.batchTimeOut,
@@ -619,7 +631,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
         throws NoSuchScanIDException, NotServingTabletException,
         org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
         TSampleNotPresentException {
-      ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
+      SingleScanSession scanSession = (SingleScanSession) sessionManager.reserveSession(scanID);
       if (scanSession == null) {
         throw new NoSuchScanIDException();
       }
@@ -631,7 +643,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       }
     }
 
-    private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession)
+    private ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession)
         throws NoSuchScanIDException, NotServingTabletException,
         org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
         TSampleNotPresentException {
@@ -639,7 +651,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       if (scanSession.nextBatchTask == null) {
         scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID,
             scanSession.interruptFlag);
-        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+        resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent),
+            scanSession, scanSession.nextBatchTask);
       }
 
       ScanBatch bresult;
@@ -694,7 +707,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
         // to client
         scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID,
             scanSession.interruptFlag);
-        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+        resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent),
+            scanSession, scanSession.nextBatchTask);
       }
 
       if (!scanResult.more)
@@ -705,14 +719,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
 
     @Override
     public void closeScan(TInfo tinfo, long scanID) {
-      final ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
+      final SingleScanSession ss = (SingleScanSession) sessionManager.removeSession(scanID);
       if (ss != null) {
         long t2 = System.currentTimeMillis();
 
         if (log.isTraceEnabled()) {
           log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ",
               TServerUtils.clientAddress.get(), ss.extent.getTableId(), ss.entriesReturned,
-              (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
+              (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
         }
 
         if (scanMetrics.isEnabled()) {
@@ -818,7 +832,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
 
       if (session.lookupTask == null) {
         session.lookupTask = new LookupTask(TabletServer.this, scanID);
-        resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
+        resourceManager.executeReadAhead(session.threadPoolExtent,
+            getScanDispatcher(session.threadPoolExtent), session, session.lookupTask);
       }
 
       try {
@@ -1174,8 +1189,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
             String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)",
                 TServerUtils.clientAddress.get(), us.totalUpdates,
                 (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(),
-                us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
-                us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
+                us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, us.walogTimes.sum() / 1000.0,
+                us.commitTimes.sum() / 1000.0));
       }
       if (us.failures.size() > 0) {
         Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 9a91e87..5ab6e27 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -21,10 +21,15 @@ import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Queue;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
@@ -36,12 +41,13 @@ import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
 
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.NamespaceNotFoundException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
@@ -50,6 +56,11 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheType;
 import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
 import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanExecutor;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
+import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.fate.util.LoggingRunnable;
@@ -67,15 +78,18 @@ import org.apache.accumulo.tserver.compaction.CompactionStrategy;
 import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
 import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
-import org.apache.accumulo.tserver.session.SessionComparator;
+import org.apache.accumulo.tserver.session.ScanSession;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.htrace.wrappers.TraceExecutorService;
+import org.apache.htrace.wrappers.TraceRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 
 /**
  * ResourceManager is responsible for managing the resources of all tablets within a tablet server.
@@ -94,14 +108,13 @@ public class TabletServerResourceManager {
   private final ExecutorService migrationPool;
   private final ExecutorService assignmentPool;
   private final ExecutorService assignMetaDataPool;
-  private final ExecutorService readAheadThreadPool;
-  private final ExecutorService defaultReadAheadThreadPool;
   private final ExecutorService summaryRetrievalPool;
   private final ExecutorService summaryParitionPool;
   private final ExecutorService summaryRemotePool;
   private final Map<String,ExecutorService> threadPools = new TreeMap<>();
 
-  private final Map<String,ExecutorService> tableThreadPools = new TreeMap<>();
+  private final Map<String,ExecutorService> scanExecutors;
+  private final Map<String,ScanExecutor> scanExecutorChoices;
 
   private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;
 
@@ -128,46 +141,15 @@ public class TabletServerResourceManager {
     return tp;
   }
 
-  private ExecutorService addEs(final Property maxThreads, String name,
-      final ThreadPoolExecutor tp) {
+  private ExecutorService addEs(IntSupplier maxThreads, String name, final ThreadPoolExecutor tp) {
     ExecutorService result = addEs(name, tp);
     SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new Runnable() {
       @Override
       public void run() {
         try {
-          int max = tserver.getConfiguration().getCount(maxThreads);
+          int max = maxThreads.getAsInt();
           if (tp.getMaximumPoolSize() != max) {
-            log.info("Changing {} to {}", maxThreads.getKey(), max);
-            tp.setCorePoolSize(max);
-            tp.setMaximumPoolSize(max);
-          }
-        } catch (Throwable t) {
-          log.error("Failed to change thread pool size", t);
-        }
-      }
-
-    }, 1000, 10 * 1000);
-    return result;
-  }
-
-  private ExecutorService addEs(final int maxThreads, final Property prefix,
-      final String propertyName, String name, final ThreadPoolExecutor tp) {
-    ExecutorService result = addEs(name, tp);
-    SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          int max = maxThreads;
-          for (Entry<String,String> entry : conf.getSystemConfiguration()
-              .getAllPropertiesWithPrefix(prefix).entrySet()) {
-            if (entry.getKey().equals(propertyName)) {
-              if (null != entry.getValue() && entry.getValue().length() != 0)
-                max = Integer.parseInt(entry.getValue());
-              break;
-            }
-          }
-          if (tp.getMaximumPoolSize() != max) {
-            log.info("Changing {} to {}", maxThreads, max);
+            log.info("Changing max threads for {} to {}", name, max);
             tp.setCorePoolSize(max);
             tp.setMaximumPoolSize(max);
           }
@@ -187,7 +169,7 @@ public class TabletServerResourceManager {
     ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, timeout, timeUnit, queue,
         new NamingThreadFactory(name));
     tp.allowCoreThreadTimeOut(true);
-    return addEs(max, name, tp);
+    return addEs(() -> conf.getSystemConfiguration().getCount(max), name, tp);
   }
 
   private ExecutorService createEs(int max, String name) {
@@ -198,83 +180,52 @@ public class TabletServerResourceManager {
     return createEs(max, name, new LinkedBlockingQueue<>());
   }
 
-  private ExecutorService createEs(int maxThreads, Property prefix, String propertyName,
-      String name, BlockingQueue<Runnable> queue) {
-    ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
-        TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
-    return addEs(maxThreads, prefix, propertyName, name, tp);
-  }
+  private ExecutorService createPriorityExecutor(ScanExecutorConfig sec,
+      Map<String,Queue<?>> scanExecQueues) {
 
-  /**
-   * If we cannot instantiate the comparator we will default to the linked blocking queue comparator
-   * 
-   * @param max
-   *          max number of threads
-   * @param comparator
-   *          comparator property
-   * @param name
-   *          name passed to the thread factory
-   * @return priority executor
-   */
-  private ExecutorService createPriorityExecutor(Property prefix, String propertyName,
-      final int maxThreads, Property comparator, String name) {
-
-    String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+    BlockingQueue<Runnable> queue;
 
-    if (null == comparatorClazz || comparatorClazz.length() == 0) {
-      log.debug("Using no comparator");
-      return createEs(maxThreads, prefix, propertyName, name, new LinkedBlockingQueue<>());
+    if (sec.prioritizerClass.orElse("").isEmpty()) {
+      queue = new LinkedBlockingQueue<>();
     } else {
-      SessionComparator comparatorObj = Property.createInstanceFromPropertyName(
-          conf.getSystemConfiguration(), comparator, SessionComparator.class, null);
-      if (null != comparatorObj) {
-        log.debug("Using priority based scheduler {}", comparatorClazz);
-        return createEs(maxThreads, prefix, propertyName, name,
-            new PriorityBlockingQueue<>(maxThreads, comparatorObj));
-      } else {
-        log.debug("Using no comparator");
-        return createEs(maxThreads, prefix, propertyName, name, new LinkedBlockingQueue<>());
+      ScanPrioritizer factory = null;
+      try {
+        factory = ConfigurationTypeHelper.getClassInstance(null, sec.prioritizerClass.get(),
+            ScanPrioritizer.class);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
-    }
-  }
 
-  /**
-   * If we cannot instantiate the comparator we will default to the linked blocking queue comparator
-   * 
-   * @param max
-   *          max number of threads
-   * @param comparator
-   *          comparator property
-   * @param name
-   *          name passed to the thread factory
-   * @return priority executor
-   */
-  private ExecutorService createPriorityExecutor(Property max, Property comparator, String name) {
-    int maxThreads = conf.getSystemConfiguration().getCount(max);
+      if (factory == null) {
+        queue = new LinkedBlockingQueue<>();
+      } else {
+        Comparator<ScanInfo> comparator = factory.createComparator(sec.prioritizerOpts);
 
-    String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+        // function to extract scan scan session from runnable
+        Function<Runnable,ScanInfo> extractor = r -> ((ScanSession.ScanMeasurer) ((TraceRunnable) r)
+            .getRunnable()).getScanInfo();
 
-    if (null == comparatorClazz || comparatorClazz.length() == 0) {
-      log.debug("Using no comparator");
-      return createEs(max, name, new LinkedBlockingQueue<>());
-    } else {
-      SessionComparator comparatorObj = Property.createInstanceFromPropertyName(
-          conf.getSystemConfiguration(), comparator, SessionComparator.class, null);
-      if (null != comparatorObj) {
-        log.debug("Using priority based scheduler {}", comparatorClazz);
-        return createEs(max, name, new PriorityBlockingQueue<>(maxThreads, comparatorObj));
-      } else {
-        log.debug("Using no comparator");
-        return createEs(max, name, new LinkedBlockingQueue<>());
+        queue = new PriorityBlockingQueue<>(sec.maxThreads,
+            Comparator.comparing(extractor, comparator));
       }
     }
+
+    scanExecQueues.put(sec.name, queue);
+
+    return createEs(() -> sec.getCurrentMaxThreads(), "scan-" + sec.name, queue, sec.priority);
   }
 
-  private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
-    int maxThreads = conf.getSystemConfiguration().getCount(max);
+  private ExecutorService createEs(IntSupplier maxThreadsSupplier, String name,
+      BlockingQueue<Runnable> queue, OptionalInt priority) {
+    int maxThreads = maxThreadsSupplier.getAsInt();
     ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
-        TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
-    return addEs(max, name, tp);
+        TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name, priority));
+    return addEs(maxThreadsSupplier, name, tp);
+  }
+
+  private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
+    IntSupplier maxThreadsSupplier = () -> conf.getSystemConfiguration().getCount(max);
+    return createEs(maxThreadsSupplier, name, queue, OptionalInt.empty());
   }
 
   private ExecutorService createEs(int min, int max, int timeout, String name) {
@@ -282,33 +233,78 @@ public class TabletServerResourceManager {
         new LinkedBlockingQueue<>(), new NamingThreadFactory(name)));
   }
 
-  /**
-   * Creates table specific thread pool for executing scan threads
-   * 
-   * @param instance
-   *          ZK instance.
-   * @param acuConf
-   *          accumulo configuration.
-   * @throws NamespaceNotFoundException
-   *           Error thrown by tables.getTableId when a name space is not found.
-   * @throws TableNotFoundException
-   *           Error thrown by tables.getTableId when a table is not found.
-   */
-  protected void createTablePools(Instance instance, AccumuloConfiguration acuConf)
-      throws NamespaceNotFoundException, TableNotFoundException {
-    for (Entry<String,String> entry : acuConf
-        .getAllPropertiesWithPrefix(Property.TSERV_READ_AHEAD_PREFIX).entrySet()) {
-      final String tableName = entry.getKey()
-          .substring(Property.TSERV_READ_AHEAD_PREFIX.getKey().length());
-      if (null == entry.getValue() || entry.getValue().length() == 0) {
-        throw new RuntimeException("Read ahead prefix is inproperly configured");
+  protected Map<String,ExecutorService> createScanExecutors(Instance instance,
+      Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>> scanExecQueues) {
+    Builder<String,ExecutorService> builder = ImmutableMap.builder();
+
+    for (ScanExecutorConfig sec : scanExecCfg) {
+      builder.put(sec.name, createPriorityExecutor(sec, scanExecQueues));
+    }
+
+    return builder.build();
+  }
+
+  private static class ScanExecutorImpl implements ScanExecutor {
+
+    private static class ConfigImpl implements ScanExecutor.Config {
+
+      final ScanExecutorConfig cfg;
+
+      public ConfigImpl(ScanExecutorConfig sec) {
+        this.cfg = sec;
+      }
+
+      @Override
+      public String getName() {
+        return cfg.name;
       }
-      final int maxThreads = Integer.parseInt(entry.getValue());
-      final String tableId = Tables.getTableId(instance, tableName).canonicalID();
-      tableThreadPools.put(tableId,
-          createPriorityExecutor(Property.TSERV_READ_AHEAD_PREFIX, entry.getKey(), maxThreads,
-              Property.TSERV_SESSION_COMPARATOR_CLASS, tableName + " specific read ahead"));
+
+      @Override
+      public int getMaxThreads() {
+        return cfg.maxThreads;
+      }
+
+      @Override
+      public Optional<String> getPrioritizerClass() {
+        return cfg.prioritizerClass;
+      }
+
+      @Override
+      public Map<String,String> getPrioritizerOptions() {
+        return cfg.prioritizerOpts;
+      }
+
     }
+
+    private final ConfigImpl config;
+    private final Queue<?> queue;
+
+    ScanExecutorImpl(ScanExecutorConfig sec, Queue<?> q) {
+      this.config = new ConfigImpl(sec);
+      this.queue = q;
+    }
+
+    @Override
+    public int getQueued() {
+      return queue.size();
+    }
+
+    @Override
+    public Config getConfig() {
+      return config;
+    }
+
+  }
+
+  private Map<String,ScanExecutor> createScanExecutorChoices(
+      Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>> scanExecQueues) {
+    Builder<String,ScanExecutor> builder = ImmutableMap.builder();
+
+    for (ScanExecutorConfig sec : scanExecCfg) {
+      builder.put(sec.name, new ScanExecutorImpl(sec, scanExecQueues.get(sec.name)));
+    }
+
+    return builder.build();
   }
 
   public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
@@ -390,11 +386,6 @@ public class TabletServerResourceManager {
 
     activeAssignments = new ConcurrentHashMap<>();
 
-    readAheadThreadPool = createPriorityExecutor(Property.TSERV_READ_AHEAD_MAXCONCURRENT,
-        Property.TSERV_SESSION_COMPARATOR_CLASS, "tablet read ahead");
-    defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
-        "metadata tablets read ahead");
-
     summaryRetrievalPool = createIdlingEs(Property.TSERV_SUMMARY_RETRIEVAL_THREADS,
         "summary file retriever", 60, TimeUnit.SECONDS);
     summaryRemotePool = createIdlingEs(Property.TSERV_SUMMARY_REMOTE_THREADS, "summary remote", 60,
@@ -402,13 +393,11 @@ public class TabletServerResourceManager {
     summaryParitionPool = createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS,
         "summary partition", 60, TimeUnit.SECONDS);
 
-    try {
-      createTablePools(tserver.getInstance(), acuConf);
-    } catch (NamespaceNotFoundException e) {
-      throw new RuntimeException(e);
-    } catch (TableNotFoundException e) {
-      throw new RuntimeException(e);
-    }
+    Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors();
+    Map<String,Queue<?>> scanExecQueues = new HashMap<>();
+    scanExecutors = createScanExecutors(tserver.getInstance(), scanExecCfg, scanExecQueues);
+    scanExecutorChoices = createScanExecutorChoices(scanExecCfg, scanExecQueues);
+
     int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
 
     Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
@@ -938,16 +927,29 @@ public class TabletServerResourceManager {
     }
   }
 
-  public void executeReadAhead(KeyExtent tablet, Runnable task) {
-    ExecutorService service = tableThreadPools.get(tablet.getTableId().canonicalID());
-    if (null != service) {
-      service.execute(task);
-    } else if (tablet.isRootTablet()) {
+  public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher, ScanSession scanInfo,
+      Runnable task) {
+
+    task = ScanSession.wrap(scanInfo, task);
+
+    if (tablet.isRootTablet()) {
       task.run();
     } else if (tablet.isMeta()) {
-      defaultReadAheadThreadPool.execute(task);
+      scanExecutors.get("meta").execute(task);
     } else {
-      readAheadThreadPool.execute(task);
+      String scanExecutorName = dispatcher.dispatch(scanInfo, scanExecutorChoices);
+      ExecutorService executor = scanExecutors.get(scanExecutorName);
+      if (executor == null) {
+        log.warn(
+            "For table id {}, {} dispatched to non-existant executor {} Using default executor.",
+            tablet.getTableId(), dispatcher.getClass().getName(), scanExecutorName);
+        executor = scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
+      } else if ("meta".equals(scanExecutorName)) {
+        log.warn("For table id {}, {} dispatched to meta executor. Using default executor.",
+            tablet.getTableId(), dispatcher.getClass().getName());
+        executor = scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
+      }
+      executor.execute(task);
     }
   }
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
index aee4477..16b9c55 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -23,7 +23,7 @@ import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.TooManyFilesException;
-import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.SingleScanSession;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.accumulo.tserver.tablet.TabletClosedException;
@@ -48,7 +48,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> {
   @Override
   public void run() {
 
-    final ScanSession scanSession = (ScanSession) server.getSession(scanID);
+    final SingleScanSession scanSession = (SingleScanSession) server.getSession(scanID);
     String oldThreadName = Thread.currentThread().getName();
 
     try {
@@ -69,10 +69,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> {
         return;
       }
 
-      long t1 = System.currentTimeMillis();
       ScanBatch batch = scanSession.scanner.read();
-      long t2 = System.currentTimeMillis();
-      scanSession.nbTimes.addStat(t2 - t1);
 
       // there should only be one thing on the queue at a time, so
       // it should be ok to call add()
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
deleted file mode 100644
index d584242..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.session;
-
-public class DefaultSessionComparator extends SessionComparator {
-
-  @Override
-  public int compareSession(Session sessionA, Session sessionB) {
-
-    final long startTimeFirst = sessionA.startTime;
-    final long startTimeSecond = sessionB.startTime;
-
-    // use the lowest max idle time
-    final long maxIdle = sessionA.maxIdleAccessTime < sessionB.maxIdleAccessTime
-        ? sessionA.maxIdleAccessTime
-        : sessionB.maxIdleAccessTime;
-
-    final long currentTime = System.currentTimeMillis();
-
-    /*
-     * Multiply by -1 so that we have a sensical comparison. This means that if comparison < 0,
-     * sessionA is newer. If comparison > 0, this means that session B is newer
-     */
-    int comparison = -1 * Long.compare(startTimeFirst, startTimeSecond);
-
-    if (!(sessionA.lastExecTime == -1 && sessionB.lastExecTime == -1)) {
-      if (comparison >= 0) {
-        long idleTimeA = currentTime - sessionA.lastExecTime;
-
-        /*
-         * If session B is newer, let's make sure that we haven't reached the max idle time, where
-         * we have to begin aging A
-         */
-        if (idleTimeA > sessionA.maxIdleAccessTime) {
-          comparison = -1 * Long.valueOf(idleTimeA - maxIdle).intValue();
-        }
-      } else {
-        long idleTimeB = currentTime - sessionB.lastExecTime;
-
-        /*
-         * If session A is newer, let's make sure that B hasn't reached the max idle time, where we
-         * have to begin aging A
-         */
-        if (idleTimeB > sessionA.maxIdleAccessTime) {
-          comparison = 1 * Long.valueOf(idleTimeB - maxIdle).intValue();
-        }
-      }
-    }
-
-    return comparison;
-  }
-
-}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index 981832a..f8db1f4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -21,7 +21,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.IterInfo;
@@ -30,13 +29,9 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.tserver.scan.ScanTask;
 
-public class MultiScanSession extends Session {
+public class MultiScanSession extends ScanSession {
   public final KeyExtent threadPoolExtent;
-  public final HashSet<Column> columnSet = new HashSet<>();
   public final Map<KeyExtent,List<Range>> queries;
-  public final List<IterInfo> ssiList;
-  public final Map<String,Map<String,String>> ssio;
-  public final Authorizations auths;
   public final SamplerConfiguration samplerConfig;
   public final long batchTimeOut;
   public final String context;
@@ -53,11 +48,8 @@ public class MultiScanSession extends Session {
       Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList,
       Map<String,Map<String,String>> ssio, Authorizations authorizations,
       SamplerConfiguration samplerConfig, long batchTimeOut, String context) {
-    super(credentials);
+    super(credentials, new HashSet<>(), ssiList, ssio, authorizations);
     this.queries = queries;
-    this.ssiList = ssiList;
-    this.ssio = ssio;
-    this.auths = authorizations;
     this.threadPoolExtent = threadPoolExtent;
     this.samplerConfig = samplerConfig;
     this.batchTimeOut = batchTimeOut;
@@ -65,20 +57,20 @@ public class MultiScanSession extends Session {
   }
 
   @Override
+  public Type getScanType() {
+    return Type.MULTI;
+  }
+
+  @Override
+  public String getTableId() {
+    return threadPoolExtent.getTableId().canonicalID();
+  }
+
+  @Override
   public boolean cleanup() {
     if (lookupTask != null)
       lookupTask.cancel(true);
     // the cancellation should provide us the safety to return true here
     return true;
   }
-
-  /**
-   * Ensure that the runnable actually runs
-   */
-  @Override
-  public void run() {
-    super.run();
-    lookupTask.run();
-  }
-
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index a55de1e..5cde06c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -16,64 +16,148 @@
  */
 package org.apache.accumulo.tserver.session;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.data.Column;
-import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
 import org.apache.accumulo.core.util.Stat;
-import org.apache.accumulo.tserver.scan.ScanTask;
-import org.apache.accumulo.tserver.tablet.ScanBatch;
-import org.apache.accumulo.tserver.tablet.Scanner;
-
-public class ScanSession extends Session {
-  public final Stat nbTimes = new Stat();
-  public final KeyExtent extent;
-  public final Set<Column> columnSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public abstract class ScanSession extends Session implements ScanInfo {
+
+  public static class ScanMeasurer implements Runnable {
+
+    private ScanSession session;
+    private Runnable task;
+
+    ScanMeasurer(ScanSession session, Runnable task) {
+      this.session = session;
+      this.task = task;
+    }
+
+    @Override
+    public void run() {
+      long t1 = System.currentTimeMillis();
+      task.run();
+      long t2 = System.currentTimeMillis();
+      session.finishedRun(t1, t2);
+    }
+
+    public ScanInfo getScanInfo() {
+      return session;
+    }
+  }
+
+  public static ScanMeasurer wrap(ScanSession scanInfo, Runnable r) {
+    return new ScanMeasurer(scanInfo, r);
+  }
+
+  private OptionalLong lastRunTime = OptionalLong.empty();
+  private Stat idleStats = new Stat();
+  public Stat runStats = new Stat();
+
+  public final HashSet<Column> columnSet;
   public final List<IterInfo> ssiList;
   public final Map<String,Map<String,String>> ssio;
   public final Authorizations auths;
-  public final AtomicBoolean interruptFlag = new AtomicBoolean();
-  public long entriesReturned = 0;
-  public long batchCount = 0;
-  public volatile ScanTask<ScanBatch> nextBatchTask;
-  public Scanner scanner;
-  public final long readaheadThreshold;
-  public final long batchTimeOut;
-  public final String context;
-
-  public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet,
-      List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, Authorizations authorizations,
-      long readaheadThreshold, long batchTimeOut, String context) {
+
+  ScanSession(TCredentials credentials, HashSet<Column> cols, List<IterInfo> ssiList,
+      Map<String,Map<String,String>> ssio, Authorizations auths) {
     super(credentials);
-    this.extent = extent;
-    this.columnSet = columnSet;
+    this.columnSet = cols;
     this.ssiList = ssiList;
     this.ssio = ssio;
-    this.auths = authorizations;
-    this.readaheadThreshold = readaheadThreshold;
-    this.batchTimeOut = batchTimeOut;
-    this.context = context;
+    this.auths = auths;
+  }
+
+  @Override
+  public long getCreationTime() {
+    return startTime;
+  }
+
+  @Override
+  public OptionalLong getLastRunTime() {
+    return lastRunTime;
   }
 
   @Override
-  public boolean cleanup() {
-    final boolean ret;
-    try {
-      if (nextBatchTask != null)
-        nextBatchTask.cancel(true);
-    } finally {
-      if (scanner != null)
-        ret = scanner.close();
-      else
-        ret = true;
+  public Stats getRunTimeStats() {
+    return runStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats() {
+    return idleStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats(long currentTime) {
+    long idleTime = currentTime - getLastRunTime().orElse(getCreationTime());
+    Preconditions.checkArgument(idleTime >= 0);
+    Stat copy = idleStats.copy();
+    copy.addStat(idleTime);
+    return copy;
+  }
+
+  @Override
+  public Set<Column> getFetchedColumns() {
+    return Collections.unmodifiableSet(columnSet);
+  }
+
+  private class IterConfImpl implements IteratorConfiguration {
+
+    private IterInfo ii;
+
+    IterConfImpl(IterInfo ii) {
+      this.ii = ii;
+    }
+
+    @Override
+    public String getIteratorClass() {
+      return ii.className;
+    }
+
+    @Override
+    public String getName() {
+      return ii.iterName;
+    }
+
+    @Override
+    public int getPriority() {
+      return ii.priority;
+    }
+
+    @Override
+    public Map<String,String> getOptions() {
+      Map<String,String> opts = ssio.get(ii.iterName);
+      return opts == null || opts.isEmpty() ? Collections.emptyMap()
+          : Collections.unmodifiableMap(opts);
     }
-    return ret;
   }
 
+  @Override
+  public Collection<IteratorConfiguration> getClientScanIterators() {
+    return Lists.transform(ssiList, IterConfImpl::new);
+  }
+
+  public void finishedRun(long start, long finish) {
+    long idleTime = start - getLastRunTime().orElse(getCreationTime());
+    long runTime = finish - start;
+    lastRunTime = OptionalLong.of(finish);
+    idleStats.addStat(idleTime);
+    runStats.addStat(runTime);
+  }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index 32f7e34..6bd52e4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -19,7 +19,7 @@ package org.apache.accumulo.tserver.session;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.rpc.TServerUtils;
 
-public abstract class Session implements Runnable {
+public class Session {
 
   enum State {
     NEW, UNRESERVED, RESERVED, REMOVED
@@ -27,9 +27,7 @@ public abstract class Session implements Runnable {
 
   public final String client;
   public long lastAccessTime;
-  protected volatile long lastExecTime = -1;
   public long startTime;
-  public long maxIdleAccessTime;
   State state = State.NEW;
   private final TCredentials credentials;
 
@@ -49,18 +47,4 @@ public abstract class Session implements Runnable {
   public boolean cleanup() {
     return true;
   }
-
-  @Override
-  public void run() {
-    lastExecTime = System.currentTimeMillis();
-  }
-
-  public void setLastExecutionTime(long lastExecTime) {
-    this.lastExecTime = lastExecTime;
-  }
-
-  public long getLastExecutionTime() {
-    return lastExecTime;
-  }
-
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index c485698..1ba2491 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -231,9 +231,6 @@ public class SessionManager {
             configuredIdle = maxUpdateIdle;
           }
           long idleTime = System.currentTimeMillis() - session.lastAccessTime;
-          if (idleTime > session.maxIdleAccessTime) {
-            session.maxIdleAccessTime = idleTime;
-          }
           if (idleTime > configuredIdle) {
             log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(),
                 session.client, idleTime);
@@ -318,8 +315,8 @@ public class SessionManager {
       ScanTask nbt = null;
       Table.ID tableID = null;
 
-      if (session instanceof ScanSession) {
-        ScanSession ss = (ScanSession) session;
+      if (session instanceof SingleScanSession) {
+        SingleScanSession ss = (SingleScanSession) session;
         nbt = ss.nextBatchTask;
         tableID = ss.extent.getTableId();
       } else if (session instanceof MultiScanSession) {
@@ -365,8 +362,8 @@ public class SessionManager {
 
     for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) {
       Session session = entry.getValue();
-      if (session instanceof ScanSession) {
-        ScanSession ss = (ScanSession) session;
+      if (session instanceof SingleScanSession) {
+        SingleScanSession ss = (SingleScanSession) session;
 
         ScanState state = ScanState.RUNNING;
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
similarity index 80%
copy from server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
copy to server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
index a55de1e..fb3e29f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
@@ -16,9 +16,9 @@
  */
 package org.apache.accumulo.tserver.session;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.data.Column;
@@ -26,18 +26,12 @@ import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.Stat;
 import org.apache.accumulo.tserver.scan.ScanTask;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Scanner;
 
-public class ScanSession extends Session {
-  public final Stat nbTimes = new Stat();
+public class SingleScanSession extends ScanSession {
   public final KeyExtent extent;
-  public final Set<Column> columnSet;
-  public final List<IterInfo> ssiList;
-  public final Map<String,Map<String,String>> ssio;
-  public final Authorizations auths;
   public final AtomicBoolean interruptFlag = new AtomicBoolean();
   public long entriesReturned = 0;
   public long batchCount = 0;
@@ -47,21 +41,27 @@ public class ScanSession extends Session {
   public final long batchTimeOut;
   public final String context;
 
-  public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet,
+  public SingleScanSession(TCredentials credentials, KeyExtent extent, HashSet<Column> columnSet,
       List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, Authorizations authorizations,
       long readaheadThreshold, long batchTimeOut, String context) {
-    super(credentials);
+    super(credentials, columnSet, ssiList, ssio, authorizations);
     this.extent = extent;
-    this.columnSet = columnSet;
-    this.ssiList = ssiList;
-    this.ssio = ssio;
-    this.auths = authorizations;
     this.readaheadThreshold = readaheadThreshold;
     this.batchTimeOut = batchTimeOut;
     this.context = context;
   }
 
   @Override
+  public Type getScanType() {
+    return Type.SINGLE;
+  }
+
+  @Override
+  public String getTableId() {
+    return extent.getTableId().canonicalID();
+  }
+
+  @Override
   public boolean cleanup() {
     final boolean ret;
     try {
@@ -75,5 +75,4 @@ public class ScanSession extends Session {
     }
     return ret;
   }
-
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 18da8a4..dc5d684 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1540,7 +1540,8 @@ public class Tablet implements TabletCommitter {
     try {
       getTabletResources().executeMajorCompaction(getExtent(), new CompactionRunner(this, reason));
     } catch (RuntimeException t) {
-      log.debug("removing {} because we encountered an exception enqueing the CompactionRunner", reason, t);
+      log.debug("removing {} because we encountered an exception enqueing the CompactionRunner",
+          reason, t);
       majorCompactionQueued.remove(reason);
       throw t;
     }
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
deleted file mode 100644
index f7cc5cd..0000000
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.session;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-public class SessionComparatorTest {
-
-  @Test
-  public void testSingleScanMultiScanNoRun() {
-    long time = System.currentTimeMillis();
-    ScanSession sessionA = emptyScanSession();
-    sessionA.lastAccessTime = 0;
-    sessionA.maxIdleAccessTime = 0;
-    sessionA.startTime = time - 1000;
-
-    MultiScanSession sessionB = emptyMultiScanSession();
-    sessionB.lastAccessTime = 0;
-    sessionB.maxIdleAccessTime = 1000;
-    sessionB.startTime = time - 800;
-
-    ScanSession sessionC = emptyScanSession();
-    sessionC.lastAccessTime = 0;
-    sessionC.maxIdleAccessTime = 1000;
-    sessionC.startTime = time - 800;
-
-    // a has never run, so it should be given priority
-    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
-    assertEquals(-1, comparator.compareSession(sessionA, sessionB));
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    assertEquals(1, comparator.compareSession(sessionB, sessionA));
-
-    // now let's assume they have been executed
-
-    assertEquals(1, comparator.compareSession(sessionA, sessionC));
-
-    assertEquals(0, comparator.compareSession(sessionC, sessionC));
-
-  }
-
-  @Test
-  public void testSingleScanRun() {
-    long time = System.currentTimeMillis();
-    ScanSession sessionA = emptyScanSession();
-    sessionA.lastAccessTime = 0;
-    sessionA.setLastExecutionTime(time);
-    sessionA.maxIdleAccessTime = 1000;
-    sessionA.startTime = time - 1000;
-
-    ScanSession sessionB = emptyScanSession();
-    sessionB.lastAccessTime = 0;
-    sessionB.setLastExecutionTime(time - 2000);
-    sessionB.maxIdleAccessTime = 1000;
-    sessionB.startTime = time - 800;
-
-    // b is newer
-    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
-    assertEquals(1, comparator.compareSession(sessionA, sessionB));
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
-
-    sessionB.setLastExecutionTime(time);
-    sessionA.setLastExecutionTime(time - 2000);
-
-    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    int comp = comparator.compareSession(sessionB, sessionA);
-    assertTrue("comparison is " + comp, comp >= 1);
-  }
-
-  @Test
-  public void testSingleScanMultiScanRun() {
-    long time = System.currentTimeMillis();
-    ScanSession sessionA = emptyScanSession();
-    sessionA.lastAccessTime = 0;
-    sessionA.setLastExecutionTime(time);
-    sessionA.maxIdleAccessTime = 1000;
-    sessionA.startTime = time - 1000;
-
-    MultiScanSession sessionB = emptyMultiScanSession();
-    sessionB.lastAccessTime = 0;
-    sessionB.setLastExecutionTime(time - 2000);
-    sessionB.maxIdleAccessTime = 1000;
-    sessionB.startTime = time - 800;
-
-    // b is newer
-    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
-    assertEquals(-1, comparator.compareSession(sessionA, sessionB));
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    assertTrue(comparator.compareSession(sessionB, sessionA) > 0);
-
-    sessionB.setLastExecutionTime(time);
-    sessionA.setLastExecutionTime(time - 2000);
-
-    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    int comp = comparator.compareSession(sessionB, sessionA);
-    assertTrue("comparison is " + comp, comp > 0);
-  }
-
-  @Test
-  public void testMultiScanRun() {
-    long time = System.currentTimeMillis();
-    ScanSession sessionA = emptyScanSession();
-    sessionA.lastAccessTime = 0;
-    sessionA.setLastExecutionTime(time);
-    sessionA.maxIdleAccessTime = 1000;
-    sessionA.startTime = time - 1000;
-
-    ScanSession sessionB = emptyScanSession();
-    sessionB.lastAccessTime = 0;
-    sessionB.setLastExecutionTime(time - 2000);
-    sessionB.maxIdleAccessTime = 1000;
-    sessionB.startTime = time - 800;
-
-    // b is newer
-    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
-    assertEquals(1, comparator.compareSession(sessionA, sessionB));
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
-
-    sessionB.setLastExecutionTime(time);
-    sessionA.setLastExecutionTime(time - 2000);
-
-    
-    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    int comp = comparator.compareSession(sessionB, sessionA);
-    assertTrue("comparison is " + comp, comp >= 1);
-  }
-
-  private static ScanSession emptyScanSession() {
-    return new ScanSession(null, null, null, null, null, null, 0, 0, null);
-  }
-
-  private static MultiScanSession emptyMultiScanSession() {
-    return new MultiScanSession(null, null, null, null, null, null, null, 0, null);
-  }
-}
diff --git a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
index e807dfb..320b9c2 100644
--- a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
+++ b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
@@ -68,7 +68,7 @@ public class IMMLGBenchmark {
     }
 
     for (Entry<String,Stat> entry : stats.entrySet()) {
-      System.out.printf("%20s : %6.2f\n", entry.getKey(), entry.getValue().getAverage());
+      System.out.printf("%20s : %6.2f\n", entry.getKey(), entry.getValue().mean());
     }
 
   }
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index cbeb1e4..907f2c6 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -606,9 +606,8 @@ public class CollectTabletStats {
   }
 
   private static void printStat(String desc, Stat s) {
-    System.out.printf(
-        "\t\tDescription: [%30s]  average: %,6.2f  std dev: %,6.2f  min: %,d  max: %,d %n", desc,
-        s.getAverage(), s.getStdDev(), s.getMin(), s.getMax());
+    System.out.printf("\t\tDescription: [%30s]  average: %,6.2f min: %,d  max: %,d %n", desc,
+        s.mean(), s.min(), s.max());
 
   }
 


[accumulo] 02/02: Merge branch 'ACCUMULO-4074-2'

Posted by kt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 5e5c74f3f11aa65ce8f1643f03c0e4efbbd1d0b3
Merge: f2d0371 aaa757e
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Jul 10 17:59:24 2018 -0400

    Merge branch 'ACCUMULO-4074-2'

 core/pom.xml                                       |  19 ++
 .../accumulo/core/conf/AccumuloConfiguration.java  | 136 ++++++++++++++
 .../accumulo/core/conf/DefaultConfiguration.java   |   5 +
 .../org/apache/accumulo/core/conf/Property.java    |  50 ++++-
 .../accumulo/core/conf/SiteConfiguration.java      |  10 +
 .../common/IteratorConfiguration.java}             |  31 ++--
 .../common/Stats.java}                             |  43 +++--
 .../core/spi/scan/IdleRatioScanPrioritizer.java    |  51 ++++++
 .../accumulo/core/spi/scan/ScanDispatcher.java     |  52 ++++++
 .../accumulo/core/spi/scan/ScanExecutor.java       |  60 ++++++
 .../apache/accumulo/core/spi/scan/ScanInfo.java    | 116 ++++++++++++
 .../scan/ScanPrioritizer.java}                     |  33 ++--
 .../core/spi/scan/SimpleScanDispatcher.java        |  74 ++++++++
 ....java => AccumuloUncaughtExceptionHandler.java} |  18 +-
 .../accumulo/core/util/NamingThreadFactory.java    |  19 +-
 .../java/org/apache/accumulo/core/util/Stat.java   |  75 ++++----
 .../core/conf/AccumuloConfigurationTest.java       | 112 +++++++++++-
 .../spi/scan/IdleRatioScanPrioritizerTest.java     |  61 +++++++
 .../core/spi/scan/SimpleScanDispatcherTest.java    |  62 +++++++
 .../accumulo/core/spi/scan/TestScanInfo.java       | 101 +++++++++++
 .../org/apache/accumulo/core/util/StatTest.java    |  40 ++--
 .../accumulo/server/conf/TableConfiguration.java   |  37 ++++
 .../accumulo/server/conf/ZooConfiguration.java     |  13 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  41 +++--
 .../tserver/TabletServerResourceManager.java       | 202 ++++++++++++++++++---
 .../accumulo/tserver/scan/NextBatchTask.java       |   7 +-
 .../accumulo/tserver/session/MultiScanSession.java |  22 ++-
 .../accumulo/tserver/session/ScanSession.java      | 162 +++++++++++++----
 .../apache/accumulo/tserver/session/Session.java   |   2 +-
 .../accumulo/tserver/session/SessionManager.java   |   8 +-
 .../{ScanSession.java => SingleScanSession.java}   |  29 ++-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   9 +-
 .../org/apache/accumulo/test/IMMLGBenchmark.java   |   2 +-
 .../test/performance/scan/CollectTabletStats.java  |   5 +-
 34 files changed, 1449 insertions(+), 258 deletions(-)