You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/07/10 22:02:33 UTC

[accumulo] 01/02: ACCUMULO-4074 Added support for multiple scan executors (#549)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit aaa757e4372a22b17201c1bee96b243b620345d3
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Jul 3 17:08:40 2018 -0400

    ACCUMULO-4074 Added support for multiple scan executors (#549)
    
     * Created plugin API for prioritizing scans that avoids accessing internal
       types.  This should make user written comparators more stable over time.
     * Made it possible to configure a comparator for each scan executor. Was
       previously a single comparator type for all executors.
     * Made it possible to pass options for comparator creation
     * Made it possible to configure a per table dispatcher. This dispatcher
       chooses which scan executor to use.  Dispatcher are user written plugins.
     * Fixed comparator type casting. I don't think the comparators in #510 worked
       because the Runnables were never the type expected. Also, abstracted all of
       this away from a user who may write a comparator.
---
 core/pom.xml                                       |  19 ++
 .../accumulo/core/conf/AccumuloConfiguration.java  | 136 +++++++++
 .../accumulo/core/conf/DefaultConfiguration.java   |   5 +
 .../org/apache/accumulo/core/conf/Property.java    |  55 +++-
 .../accumulo/core/conf/SiteConfiguration.java      |  10 +
 .../core/spi/common/IteratorConfiguration.java     |  30 +-
 .../common/Stats.java}                             |  36 ++-
 .../core/spi/scan/IdleRatioScanPrioritizer.java    |  51 ++++
 .../accumulo/core/spi/scan/ScanDispatcher.java     |  52 ++++
 .../accumulo/core/spi/scan/ScanExecutor.java       |  60 ++++
 .../apache/accumulo/core/spi/scan/ScanInfo.java    | 116 ++++++++
 .../accumulo/core/spi/scan/ScanPrioritizer.java    |  22 +-
 .../core/spi/scan/SimpleScanDispatcher.java        |  74 +++++
 .../util/AccumuloUncaughtExceptionHandler.java     |   1 -
 .../accumulo/core/util/NamingThreadFactory.java    |  16 +-
 .../java/org/apache/accumulo/core/util/Stat.java   |  75 ++---
 .../core/conf/AccumuloConfigurationTest.java       | 112 +++++++-
 .../spi/scan/IdleRatioScanPrioritizerTest.java     |  61 +++++
 .../core/spi/scan/SimpleScanDispatcherTest.java    |  62 +++++
 .../accumulo/core/spi/scan/TestScanInfo.java       | 101 +++++++
 .../org/apache/accumulo/core/util/StatTest.java    |  40 ++-
 .../accumulo/server/conf/TableConfiguration.java   |  37 +++
 .../accumulo/server/conf/ZooConfiguration.java     |  13 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  41 ++-
 .../tserver/TabletServerResourceManager.java       | 304 +++++++++++----------
 .../accumulo/tserver/scan/NextBatchTask.java       |   7 +-
 .../tserver/session/DefaultSessionComparator.java  |  67 -----
 .../accumulo/tserver/session/MultiScanSession.java |  32 +--
 .../accumulo/tserver/session/ScanSession.java      | 162 ++++++++---
 .../apache/accumulo/tserver/session/Session.java   |  18 +-
 .../accumulo/tserver/session/SessionManager.java   |  11 +-
 .../{ScanSession.java => SingleScanSession.java}   |  29 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   3 +-
 .../tserver/session/SessionComparatorTest.java     | 170 ------------
 .../org/apache/accumulo/test/IMMLGBenchmark.java   |   2 +-
 .../test/performance/scan/CollectTabletStats.java  |   5 +-
 36 files changed, 1411 insertions(+), 624 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index cf45283..4535f50 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -212,6 +212,25 @@
               </allows>
             </configuration>
           </execution>
+          <execution>
+            <id>apilyzer-spi</id>
+            <goals>
+              <goal>analyze</goal>
+            </goals>
+            <configuration>
+              <outputFile>${project.build.directory}/apilyzer-spi.txt</outputFile>
+              <includes>
+                <include>org[.]apache[.]accumulo[.]core[.]spi[.].*</include>
+              </includes>
+              <excludes />
+              <allows>
+                <allow>org[.]apache[.]hadoop[.]io[.]Text</allow>
+                <allow>org[.]apache[.]accumulo[.]core[.]client(?!.*[.](impl|thrift)[.].*)[.].*</allow>
+                <allow>org[.]apache[.]accumulo[.]core[.]data(?!.*[.](impl|thrift)[.].*)[.].*</allow>
+                <allow>org[.]apache[.]accumulo[.]core[.]security(?!.*[.](crypto)[.].*)[.].*</allow>
+              </allows>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
       <plugin>
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
index 5a9ce21..5427d79 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
@@ -16,12 +16,17 @@
  */
 package org.apache.accumulo.core.conf;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.TreeMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -29,10 +34,12 @@ import java.util.function.Predicate;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.PropertyType.PortRange;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
 import org.apache.accumulo.core.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 /**
@@ -348,6 +355,135 @@ public abstract class AccumuloConfiguration implements Iterable<Entry<String,Str
     return maxFilesPerTablet;
   }
 
+  public class ScanExecutorConfig {
+    public final String name;
+    public final int maxThreads;
+    public final OptionalInt priority;
+    public final Optional<String> prioritizerClass;
+    public final Map<String,String> prioritizerOpts;
+
+    public ScanExecutorConfig(String name, int maxThreads, OptionalInt priority,
+        Optional<String> comparatorFactory, Map<String,String> comparatorFactoryOpts) {
+      this.name = name;
+      this.maxThreads = maxThreads;
+      this.priority = priority;
+      this.prioritizerClass = comparatorFactory;
+      this.prioritizerOpts = comparatorFactoryOpts;
+    }
+
+    /**
+     * Re-reads the max threads from the configuration that created this class
+     */
+    public int getCurrentMaxThreads() {
+      Integer depThreads = getDeprecatedScanThreads(name);
+      if (depThreads != null) {
+        return depThreads;
+      }
+
+      String prop = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + name + "." + SCAN_EXEC_THREADS;
+      String val = getAllPropertiesWithPrefix(Property.TSERV_SCAN_EXECUTORS_PREFIX).get(prop);
+      return Integer.parseInt(val);
+    }
+  }
+
+  public boolean isPropertySet(Property prop) {
+    throw new UnsupportedOperationException();
+  }
+
+  @SuppressWarnings("deprecation")
+  Integer getDeprecatedScanThreads(String name) {
+
+    Property prop;
+    Property deprecatedProp;
+
+    if (name.equals(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME)) {
+      prop = Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS;
+      deprecatedProp = Property.TSERV_READ_AHEAD_MAXCONCURRENT;
+    } else if (name.equals("meta")) {
+      prop = Property.TSERV_SCAN_EXECUTORS_META_THREADS;
+      deprecatedProp = Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT;
+    } else {
+      return null;
+    }
+
+    if (!isPropertySet(prop) && isPropertySet(deprecatedProp)) {
+      log.warn("Property {} is deprecated, use {} instead.", prop.getKey(),
+          deprecatedProp.getKey());
+      return Integer.valueOf(get(deprecatedProp));
+    } else if (isPropertySet(prop) && isPropertySet(deprecatedProp)) {
+      log.warn("Deprecated property {} ignored because {} is set", deprecatedProp.getKey(),
+          prop.getKey());
+    }
+
+    return null;
+  }
+
+  private static final String SCAN_EXEC_THREADS = "threads";
+  private static final String SCAN_EXEC_PRIORITY = "priority";
+  private static final String SCAN_EXEC_PRIORITIZER = "prioritizer";
+  private static final String SCAN_EXEC_PRIORITIZER_OPTS = "prioritizer.opts.";
+
+  public Collection<ScanExecutorConfig> getScanExecutors() {
+
+    Map<String,Map<String,String>> propsByName = new HashMap<>();
+
+    List<ScanExecutorConfig> scanResources = new ArrayList<>();
+
+    for (Entry<String,String> entry : getAllPropertiesWithPrefix(
+        Property.TSERV_SCAN_EXECUTORS_PREFIX).entrySet()) {
+
+      String suffix = entry.getKey()
+          .substring(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey().length());
+      String[] tokens = suffix.split("\\.", 2);
+      String name = tokens[0];
+
+      propsByName.computeIfAbsent(name, k -> new HashMap<>()).put(tokens[1], entry.getValue());
+    }
+
+    for (Entry<String,Map<String,String>> entry : propsByName.entrySet()) {
+      String name = entry.getKey();
+      Integer threads = null;
+      Integer prio = null;
+      String prioritizerClass = null;
+      Map<String,String> prioritizerOpts = new HashMap<>();
+
+      for (Entry<String,String> subEntry : entry.getValue().entrySet()) {
+        String opt = subEntry.getKey();
+        String val = subEntry.getValue();
+
+        if (opt.equals(SCAN_EXEC_THREADS)) {
+          Integer depThreads = getDeprecatedScanThreads(name);
+          if (depThreads == null) {
+            threads = Integer.parseInt(val);
+          } else {
+            threads = depThreads;
+          }
+        } else if (opt.equals(SCAN_EXEC_PRIORITY)) {
+          prio = Integer.parseInt(val);
+        } else if (opt.equals(SCAN_EXEC_PRIORITIZER)) {
+          prioritizerClass = val;
+        } else if (opt.startsWith(SCAN_EXEC_PRIORITIZER_OPTS)) {
+          String key = opt.substring(SCAN_EXEC_PRIORITIZER_OPTS.length());
+          if (key.isEmpty()) {
+            throw new IllegalStateException("Invalid scan executor option : " + opt);
+          }
+          prioritizerOpts.put(key, val);
+        } else {
+          throw new IllegalStateException("Unkown scan executor option : " + opt);
+        }
+      }
+
+      Preconditions.checkArgument(threads != null && threads > 0,
+          "Scan resource %s incorrectly specified threads", name);
+
+      scanResources.add(new ScanExecutorConfig(name, threads,
+          prio == null ? OptionalInt.empty() : OptionalInt.of(prio),
+          Optional.ofNullable(prioritizerClass), prioritizerOpts));
+    }
+
+    return scanResources;
+  }
+
   /**
    * Invalidates the <code>ZooCache</code> used for storage and quick retrieval of properties for
    * this configuration.
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
index fc2891e..54b87cc 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
@@ -52,4 +52,9 @@ public class DefaultConfiguration extends AccumuloConfiguration {
     resolvedProps.entrySet().stream().filter(p -> filter.test(p.getKey()))
         .forEach(e -> props.put(e.getKey(), e.getValue()));
   }
+
+  @Override
+  public boolean isPropertySet(Property prop) {
+    return false;
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 98402ef..b7ff210 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -30,6 +30,9 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
 import org.apache.accumulo.core.util.interpret.DefaultScanInterpreter;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
@@ -430,14 +433,37 @@ public enum Property {
       "When a tablet server's SimpleTimer thread triggers to check idle"
           + " sessions, this configurable option will be used to evaluate update"
           + " sessions to determine if they can be closed due to inactivity"),
+  @Deprecated
   TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16", PropertyType.COUNT,
-      "The maximum number of concurrent read ahead that will execute. This effectively"
-          + " limits the number of long running scans that can run concurrently per tserver."),
-  TSERV_READ_AHEAD_PREFIX("tserver.readahead.concurrent.table.", null, PropertyType.PREFIX,
-      "Properties in this category allow overriding of table specific read ahead pools"),
+      "This property is deprecated since 2.0.0, use tserver.scan.executors.default.threads "
+          + "instead. The maximum number of concurrent read ahead that will execute. This "
+          + "effectively limits the number of long running scans that can run concurrently "
+          + "per tserver.\""),
+  @Deprecated
   TSERV_METADATA_READ_AHEAD_MAXCONCURRENT("tserver.metadata.readahead.concurrent.max", "8",
       PropertyType.COUNT,
-      "The maximum number of concurrent metadata read ahead that will execute."),
+      "This property is deprecated since 2.0.0, use tserver.scan.executors.meta.threads instead. "
+          + "The maximum number of concurrent metadata read ahead that will execute."),
+  TSERV_SCAN_EXECUTORS_PREFIX("tserver.scan.executors.", null, PropertyType.PREFIX,
+      "Prefix for defining executors to service scans.  For each executor the number of threads, "
+          + "thread priority, and an optional prioritizer can be configured.  The prioritizer "
+          + "determines which scan an executor should run first and must implement "
+          + ScanPrioritizer.class.getName() + ". Tables can select an executor by setting"
+          + " table.scan.dispatcher. To configure a new executor, set "
+          + "tserver.scan.executors.<name>.threads=<number>.  Optionally, can also set "
+          + "tserver.scan.executors.<name>.priority=<number 1 to 10>, "
+          + "tserver.scan.executors.<name>.prioritizer=<class name>, and "
+          + "tserver.scan.executors.<name>.prioritizer.opts.<key>=<value>"),
+  TSERV_SCAN_EXECUTORS_DEFAULT_THREADS("tserver.scan.executors.default.threads", "16",
+      PropertyType.COUNT,
+      "The number of threads for the scan executor that tables use by default."),
+  TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER("tserver.scan.executors.default.prioritizer", "",
+      PropertyType.STRING,
+      "Prioritizer for the default scan executor.  Defaults to none which "
+          + "results in FIFO priority.  Set to a class that implements "
+          + ScanPrioritizer.class.getName() + " to configure one."),
+  TSERV_SCAN_EXECUTORS_META_THREADS("tserver.scan.executors.meta.threads", "8", PropertyType.COUNT,
+      "The number of threads for the metadata table scan executor."),
   TSERV_MIGRATE_MAXCONCURRENT("tserver.migrations.concurrent.max", "1", PropertyType.COUNT,
       "The maximum number of concurrent tablet migrations for a tablet server"),
   TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", PropertyType.COUNT,
@@ -541,9 +567,6 @@ public enum Property {
   TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", PropertyType.COUNT,
       "The number of threads on each tablet server available to retrieve"
           + " summary data, that is not currently in cache, from RFiles."),
-  TSERV_SESSION_COMPARATOR_CLASS("tserver.summary.comparator.class", "", PropertyType.CLASSNAME,
-      "A customizable Scan session comparator. Note that by default, the value is empty"
-          + " and thus uses no session comparator"),
 
   // accumulo garbage collector properties
   GC_PREFIX("gc.", null, PropertyType.PREFIX,
@@ -669,6 +692,19 @@ public enum Property {
       PropertyType.BYTES,
       "The max RFile size used for a merging minor compaction. The default"
           + " value of 0 disables a max file size."),
+  TABLE_SCAN_DISPATCHER("table.scan.dispatcher", SimpleScanDispatcher.class.getName(),
+      PropertyType.CLASSNAME,
+      "This class is used to dynamically dispatch scans to configured scan executors.  This setting"
+          + " defaults to " + SimpleScanDispatcher.class.getSimpleName()
+          + " which dispatches to an executor"
+          + " named 'default' when it is optionless. Setting the option "
+          + "'table.scan.dispatcher.opts.executor=<name>' causes "
+          + SimpleScanDispatcher.class.getSimpleName() + " to dispatch to the specified executor. "
+          + "It has more options listed in its javadoc. Configured classes must implement "
+          + ScanDispatcher.class.getName() + ".  This property is ignored for the root and metadata"
+          + " table.  The metadata table always dispatches to a scan executor named `meta`."),
+  TABLE_SCAN_DISPATCHER_OPTS("table.scan.dispatcher.opts.", null, PropertyType.PREFIX,
+      "Options for the table scan dispatcher"),
   TABLE_SCAN_MAXMEM("table.scan.max.memory", "512K", PropertyType.BYTES,
       "The maximum amount of memory that will be used to cache results of a client query/scan. "
           + "Once this limit is reached, the buffered data is sent to the client."),
@@ -1153,7 +1189,8 @@ public enum Property {
             || key.startsWith(Property.TABLE_REPLICATION_TARGET.getKey())
             || key.startsWith(Property.TABLE_ARBITRARY_PROP_PREFIX.getKey())
             || key.startsWith(TABLE_SAMPLER_OPTS.getKey())
-            || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey())));
+            || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey())
+            || key.startsWith(TABLE_SCAN_DISPATCHER_OPTS.getKey())));
   }
 
   /**
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
index 001b1db..cf0fe25 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * An {@link AccumuloConfiguration} which first loads any properties set on the command-line (using
  * the -o option) and then from an XML file, usually accumulo-site.xml. This implementation supports
@@ -165,6 +167,14 @@ public class SiteConfiguration extends AccumuloConfiguration {
   }
 
   @Override
+  public boolean isPropertySet(Property prop) {
+    Preconditions.checkArgument(!prop.isSensitive(),
+        "This method not implemented for sensitive props");
+    return CliConfiguration.get(prop) != null || staticConfigs.containsKey(prop.getKey())
+        || getXmlConfig().get(prop.getKey()) != null || parent.isPropertySet(prop);
+  }
+
+  @Override
   public void getProperties(Map<String,String> props, Predicate<String> filter) {
     getProperties(props, filter, true);
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
similarity index 58%
rename from server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
rename to core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
index 28e6ef2..40f05e5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
@@ -14,23 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.tserver.session;
+package org.apache.accumulo.core.spi.common;
 
-public class SingleRangePriorityComparator extends DefaultSessionComparator {
+import java.util.Map;
 
-  @Override
-  public int compareSession(Session sessionA, Session sessionB) {
-    int priority = super.compareSession(sessionA, sessionB);
+/**
+ * Provides information about a configured Accumulo Iterator
+ *
+ * @since 2.0.0
+ */
+public interface IteratorConfiguration {
+  String getIteratorClass();
+
+  String getName();
+
+  int getPriority();
 
-    if (sessionA instanceof MultiScanSession && sessionB instanceof ScanSession) {
-      if (priority < 0) {
-        priority *= -1;
-      }
-    } else if (sessionB instanceof MultiScanSession && sessionA instanceof ScanSession) {
-      if (priority > 0) {
-        priority *= -1;
-      }
-    }
-    return priority;
-  }
+  Map<String,String> getOptions();
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
similarity index 58%
copy from core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
copy to core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
index f688010..a1b9322 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
@@ -14,21 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.core.util;
+package org.apache.accumulo.core.spi.common;
 
-import java.lang.Thread.UncaughtExceptionHandler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * @since 2.0.0
+ */
+public interface Stats {
 
-public class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandler {
+  /**
+   * @return the minimum data point seen, or 0 if no data was seen
+   */
+  long min();
 
-  private static final Logger log = LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
+  /**
+   * @return the maximum data point seen, or 0 if no data was seen
+   */
+  long max();
 
-  @Override
-  public void uncaughtException(Thread t, Throwable e) {
+  /**
+   * @return the mean of the data points seen, or {@link Double#NaN} if no data was seen
+   */
+  double mean();
 
-    log.error(String.format("Caught an exception in %s.  Shutting down.", t), e);
-  }
+  /**
+   * @return the sum of the data points seen, or 0 if no data was seen
+   */
+  long sum();
 
+  /**
+   * @return the number of data points seen
+   */
+  long num();
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
new file mode 100644
index 0000000..c567460
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Comparator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Prioritize scans based on the ratio of runTime/idleTime. Scans with a lower ratio have a higher
+ * priority. When the ratio is equal, the scan with the oldest last run time has the highest
+ * priority. If neither have run, then the oldest gets priority.
+ *
+ * @since 2.0.0
+ */
+public class IdleRatioScanPrioritizer implements ScanPrioritizer {
+  private static double idleRatio(long currTime, ScanInfo si) {
+    double totalRunTime = si.getRunTimeStats().sum();
+    double totalIdleTime = Math.max(1, si.getIdleTimeStats(currTime).sum());
+    return totalRunTime / totalIdleTime;
+  }
+
+  @Override
+  public Comparator<ScanInfo> createComparator(Map<String,String> options) {
+    Preconditions.checkArgument(options.isEmpty());
+
+    Comparator<ScanInfo> c1 = (si1, si2) -> {
+      long currTime = System.currentTimeMillis();
+      return Double.compare(idleRatio(currTime, si1), idleRatio(currTime, si2));
+    };
+
+    return c1.thenComparingLong(si -> si.getLastRunTime().orElse(0))
+        .thenComparingLong(si -> si.getCreationTime());
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
new file mode 100644
index 0000000..a3bc3f1
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A per table scan dispatcher that decides which executor should be used to processes a scan. For
+ * information about configuring, find the documentation for the {@code table.scan.dispatcher} and
+ * {@code table.scan.dispatcher.opts.} properties.
+ *
+ * @since 2.0.0
+ */
+public interface ScanDispatcher {
+  /**
+   * This method is called once after a ScanDispatcher is instantiated.
+   *
+   * @param options
+   *          The configured options. For example if the table properties
+   *          {@code table.scan.dispatcher.opts.p1=abc} and
+   *          {@code table.scan.dispatcher.opts.p9=123} were set, then this map would contain
+   *          {@code p1=abc} and {@code p9=123}.
+   */
+  public default void init(Map<String,String> options) {
+    Preconditions.checkArgument(options.isEmpty(), "No options expected");
+  }
+
+  /**
+   * @param scanInfo
+   *          Information about the scan.
+   * @param scanExecutors
+   *          Information about the currently configured executors.
+   * @return Should return one of the executors named in scanExecutors.keySet()
+   */
+  String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors);
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
new file mode 100644
index 0000000..a55b090
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for obtaining information about a scan executor
+ *
+ * @since 2.0.0
+ */
+public interface ScanExecutor {
+
+  interface Config {
+    /**
+     * @return the unique name used to identified executor in config
+     */
+    String getName();
+
+    /**
+     * @return the max number of threads that were configured
+     */
+    int getMaxThreads();
+
+    /**
+     * @return the prioritizer that was configured
+     */
+    Optional<String> getPrioritizerClass();
+
+    /**
+     * @return the prioritizer options
+     */
+    Map<String,String> getPrioritizerOptions();
+  }
+
+  /**
+   * @return The number of task queued for the executor
+   */
+  int getQueued();
+
+  /**
+   * @return The configuration used to create the executor
+   */
+  Config getConfig();
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
new file mode 100644
index 0000000..7961c21
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collection;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+
+/**
+ * Provides information about an active Accumulo scan against a tablet. Accumulo scans operate by
+ * repeatedly gathering batches of data and returning those to the client.
+ *
+ * <p>
+ * All times are in milliseconds and obtained using System.currentTimeMillis().
+ *
+ * @since 2.0.0
+ */
+public interface ScanInfo {
+
+  enum Type {
+    /**
+     * A single range scan started using a {@link Scanner}
+     */
+    SINGLE,
+    /**
+     * A multi range scan started using a {@link BatchScanner}
+     */
+    MULTI
+  }
+
+  Type getScanType();
+
+  String getTableId();
+
+  /**
+   * Returns the first time a tablet knew about a scan over its portion of data.
+   */
+  long getCreationTime();
+
+  /**
+   * If the scan has run, returns the last run time.
+   */
+  OptionalLong getLastRunTime();
+
+  /**
+   * Returns timing statistics about running and gathering a batches of data.
+   */
+  Stats getRunTimeStats();
+
+  /**
+   * Returns statistics about the time between running. These stats are only about the idle times
+   * before the last run time. The idle time after the last run time are not included. If the scan
+   * has never run, then there are no stats.
+   */
+  Stats getIdleTimeStats();
+
+  /**
+   * This method is similar to {@link #getIdleTimeStats()}, but it also includes the time period
+   * between the last run time and now in the stats. If the scan has never run, then the stats are
+   * computed using only {@code currentTime - creationTime}.
+   */
+  Stats getIdleTimeStats(long currentTime);
+
+  /**
+   * This method returns what column were fetched by a scan. When a family is fetched, a Column
+   * object where everything but the family is null is in the set.
+   *
+   * <p>
+   * The following example code shows how this method can be used to check if a family was fetched
+   * or a family+qualifier was fetched. If continually checking for the same column, should probably
+   * create a constant.
+   *
+   * <pre>
+   * <code>
+   *   boolean wasFamilyFetched(ScanInfo si, byte[] fam) {
+   *     Column family = new Column(fam, null, null);
+   *     return si.getFetchedColumns().contains(family);
+   *   }
+   *
+   *   boolean wasColumnFetched(ScanInfo si, byte[] fam, byte[] qual) {
+   *     Column col = new Column(fam, qual, null);
+   *     return si.getFetchedColumns().contains(col);
+   *   }
+   * </code>
+   * </pre>
+   *
+   *
+   * @return The family and family+qualifier pairs fetched.
+   */
+  Set<Column> getFetchedColumns();
+
+  /**
+   * @return iterators that where configured on the client side scanner
+   */
+  Collection<IteratorConfiguration> getClientScanIterators();
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
similarity index 65%
rename from server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
rename to core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
index dcfa1d4..51a4254 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
@@ -14,19 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.tserver.session;
+package org.apache.accumulo.core.spi.scan;
 
 import java.util.Comparator;
+import java.util.Map;
 
-public abstract class SessionComparator implements Comparator<Runnable> {
-
-  @Override
-  public int compare(Runnable sessionA, Runnable sessionB) {
-    if (sessionA instanceof Session && sessionB instanceof Session)
-      return compareSession((Session) sessionA, (Session) sessionB);
-    else
-      return 0;
-  }
-
-  public abstract int compareSession(final Session sessionA, final Session sessionB);
+/**
+ * A factory for creating comparators used for prioritizing scans. For information about
+ * configuring, find the documentation for the {@code tserver.scan.executors.} property.
+ *
+ * @since 2.0.0
+ */
+public interface ScanPrioritizer {
+  Comparator<ScanInfo> createComparator(Map<String,String> options);
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
new file mode 100644
index 0000000..96e7d2c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * If no options are given, then this will dispatch to an executor named {@code default}. This
+ * dispatcher supports the following options.
+ *
+ * <UL>
+ * <LI>{@code table.scan.dispatcher.opts.executor=<scan executor name>} : dispatches all scans to
+ * the named executor.</LI>
+ * <LI>{@code table.scan.dispatcher.opts.multi_executor=<scan executor name>} : dispatches batch
+ * scans to the named executor.</LI>
+ * <LI>{@code table.scan.dispatcher.opts.single_executor=<scan executor name>} : dispatches regular
+ * scans to the named executor.</LI>
+ * </UL>
+ *
+ * The {@code multi_executor} and {@code single_executor} options override the {@code executor}
+ * option.
+ */
+
+public class SimpleScanDispatcher implements ScanDispatcher {
+
+  private final Set<String> VALID_OPTS = ImmutableSet.of("executor", "multi_executor",
+      "single_executor");
+  private String multiExecutor;
+  private String singleExecutor;
+
+  public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
+
+  @Override
+  public void init(Map<String,String> options) {
+    Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS);
+    Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : %s", invalidOpts);
+
+    String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME);
+    multiExecutor = options.getOrDefault("multi_executor", base);
+    singleExecutor = options.getOrDefault("single_executor", base);
+  }
+
+  @Override
+  public String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors) {
+    switch (scanInfo.getScanType()) {
+      case MULTI:
+        return multiExecutor;
+      case SINGLE:
+        return singleExecutor;
+      default:
+        throw new IllegalArgumentException("Unexpected scan type " + scanInfo.getScanType());
+
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
index f688010..ed9c150 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
@@ -27,7 +27,6 @@ public class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandle
 
   @Override
   public void uncaughtException(Thread t, Throwable e) {
-
     log.error(String.format("Caught an exception in %s.  Shutting down.", t), e);
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
index 0b4730c..57d429b 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.core.util;
 
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.OptionalInt;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -26,20 +28,30 @@ import org.slf4j.LoggerFactory;
 public class NamingThreadFactory implements ThreadFactory {
   private static final Logger log = LoggerFactory.getLogger(NamingThreadFactory.class);
 
-  private static final AccumuloUncaughtExceptionHandler uncaughtHandler = new AccumuloUncaughtExceptionHandler();
+  private static final UncaughtExceptionHandler UEH = new AccumuloUncaughtExceptionHandler();
 
   private AtomicInteger threadNum = new AtomicInteger(1);
   private String name;
+  private OptionalInt priority;
 
   public NamingThreadFactory(String name) {
     this.name = name;
+    this.priority = OptionalInt.empty();
+  }
+
+  public NamingThreadFactory(String name, OptionalInt priority) {
+    this.name = name;
+    this.priority = priority;
   }
 
   @Override
   public Thread newThread(Runnable r) {
     Thread thread = new Daemon(new LoggingRunnable(log, r),
         name + " " + threadNum.getAndIncrement());
-    thread.setUncaughtExceptionHandler(uncaughtHandler);
+    thread.setUncaughtExceptionHandler(UEH);
+    if (priority.isPresent()) {
+      thread.setPriority(priority.getAsInt());
+    }
     return thread;
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Stat.java b/core/src/main/java/org/apache/accumulo/core/util/Stat.java
index 8bdeb63..704622b 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Stat.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Stat.java
@@ -16,67 +16,70 @@
  */
 package org.apache.accumulo.core.util;
 
-import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.accumulo.core.spi.common.Stats;
 import org.apache.commons.math3.stat.descriptive.moment.Mean;
-import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
-import org.apache.commons.math3.stat.descriptive.rank.Max;
-import org.apache.commons.math3.stat.descriptive.rank.Min;
-import org.apache.commons.math3.stat.descriptive.summary.Sum;
 
-public class Stat {
-  Min min;
-  Max max;
-  Sum sum;
+public class Stat implements Stats {
+  long min;
+  long max;
+  long sum;
   Mean mean;
-  StandardDeviation sd;
-
-  StorelessUnivariateStatistic[] stats;
 
   public Stat() {
-    min = new Min();
-    max = new Max();
-    sum = new Sum();
     mean = new Mean();
-    sd = new StandardDeviation();
-
-    stats = new StorelessUnivariateStatistic[] {min, max, sum, mean, sd};
+    clear();
   }
 
   public void addStat(long stat) {
-    for (StorelessUnivariateStatistic statistic : stats) {
-      statistic.increment(stat);
-    }
+    min = Math.min(min, stat);
+    max = Math.max(max, stat);
+    sum += stat;
+    mean.increment(stat);
   }
 
-  public long getMin() {
-    return (long) min.getResult();
+  @Override
+  public long min() {
+    return num() == 0 ? 0L : min;
   }
 
-  public long getMax() {
-    return (long) max.getResult();
+  @Override
+  public long max() {
+    return num() == 0 ? 0L : max;
   }
 
-  public long getSum() {
-    return (long) sum.getResult();
+  @Override
+  public long sum() {
+    return sum;
   }
 
-  public double getAverage() {
+  @Override
+  public double mean() {
     return mean.getResult();
   }
 
-  public double getStdDev() {
-    return sd.getResult();
-  }
-
   @Override
   public String toString() {
-    return String.format("%,d %,d %,.2f %,d", getMin(), getMax(), getAverage(), mean.getN());
+    return String.format("%,d %,d %,.2f %,d", min(), max(), mean(), mean.getN());
   }
 
   public void clear() {
-    for (StorelessUnivariateStatistic statistic : stats) {
-      statistic.clear();
-    }
+    min = Long.MAX_VALUE;
+    max = Long.MIN_VALUE;
+    sum = 0;
+    mean.clear();
+  }
+
+  @Override
+  public long num() {
+    return mean.getN();
   }
 
+  public Stat copy() {
+    Stat stat = new Stat();
+    stat.min = this.min;
+    stat.max = this.max;
+    stat.sum = this.sum;
+    stat.mean = this.mean.copy();
+    return stat;
+  }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
index 9f77834..0fcfebd 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
@@ -21,11 +21,15 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.function.Predicate;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -131,6 +135,15 @@ public class AccumuloConfigurationTest {
 
     private HashMap<String,String> props = new HashMap<>();
     private int upCount = 0;
+    private AccumuloConfiguration parent;
+
+    TestConfiguration() {
+      parent = null;
+    }
+
+    TestConfiguration(AccumuloConfiguration parent) {
+      this.parent = parent;
+    }
 
     public void set(String p, String v) {
       props.put(p, v);
@@ -138,17 +151,29 @@ public class AccumuloConfigurationTest {
     }
 
     @Override
+    public boolean isPropertySet(Property prop) {
+      return props.containsKey(prop.getKey());
+    }
+
+    @Override
     public long getUpdateCount() {
       return upCount;
     }
 
     @Override
     public String get(Property property) {
-      return props.get(property.getKey());
+      String v = props.get(property.getKey());
+      if (v == null & parent != null) {
+        v = parent.get(property);
+      }
+      return v;
     }
 
     @Override
     public void getProperties(Map<String,String> output, Predicate<String> filter) {
+      if (parent != null) {
+        parent.getProperties(output, filter);
+      }
       for (Entry<String,String> entry : props.entrySet()) {
         if (filter.test(entry.getKey())) {
           output.put(entry.getKey(), entry.getValue());
@@ -267,4 +292,89 @@ public class AccumuloConfigurationTest {
     Map<String,String> pmL = tc.getAllPropertiesWithPrefix(Property.TABLE_ITERATOR_SCAN_PREFIX);
     assertSame(pmG, pmL);
   }
+
+  @Test
+  public void testScanExecutors() {
+    String defName = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+    TestConfiguration tc = new TestConfiguration(DefaultConfiguration.getInstance());
+
+    Collection<ScanExecutorConfig> executors = tc.getScanExecutors();
+
+    Assert.assertEquals(2, executors.size());
+
+    ScanExecutorConfig sec = executors.stream().filter(c -> c.name.equals(defName)).findFirst()
+        .get();
+    Assert.assertEquals(
+        Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+        sec.maxThreads);
+    Assert.assertFalse(sec.priority.isPresent());
+    Assert.assertTrue(sec.prioritizerClass.get().isEmpty());
+    Assert.assertTrue(sec.prioritizerOpts.isEmpty());
+
+    // ensure deprecated props is read if nothing else is set
+    tc.set("tserver.readahead.concurrent.max", "6");
+    Assert.assertEquals(6, sec.getCurrentMaxThreads());
+    Assert.assertEquals(
+        Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+        sec.maxThreads);
+    ScanExecutorConfig sec2 = tc.getScanExecutors().stream().filter(c -> c.name.equals(defName))
+        .findFirst().get();
+    Assert.assertEquals(6, sec2.maxThreads);
+
+    // ensure new prop overrides deperecated prop
+    tc.set(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey(), "9");
+    Assert.assertEquals(9, sec.getCurrentMaxThreads());
+    Assert.assertEquals(
+        Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+        sec.maxThreads);
+    ScanExecutorConfig sec3 = tc.getScanExecutors().stream().filter(c -> c.name.equals(defName))
+        .findFirst().get();
+    Assert.assertEquals(9, sec3.maxThreads);
+
+    ScanExecutorConfig sec4 = executors.stream().filter(c -> c.name.equals("meta")).findFirst()
+        .get();
+    Assert.assertEquals(
+        Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getDefaultValue()),
+        sec4.maxThreads);
+    Assert.assertFalse(sec4.priority.isPresent());
+    Assert.assertFalse(sec4.prioritizerClass.isPresent());
+    Assert.assertTrue(sec4.prioritizerOpts.isEmpty());
+
+    tc.set("tserver.metadata.readahead.concurrent.max", "2");
+    Assert.assertEquals(2, sec4.getCurrentMaxThreads());
+    ScanExecutorConfig sec5 = tc.getScanExecutors().stream().filter(c -> c.name.equals("meta"))
+        .findFirst().get();
+    Assert.assertEquals(2, sec5.maxThreads);
+
+    tc.set(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getKey(), "3");
+    Assert.assertEquals(3, sec4.getCurrentMaxThreads());
+    ScanExecutorConfig sec6 = tc.getScanExecutors().stream().filter(c -> c.name.equals("meta"))
+        .findFirst().get();
+    Assert.assertEquals(3, sec6.maxThreads);
+
+    String prefix = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey();
+    tc.set(prefix + "hulksmash.threads", "66");
+    tc.set(prefix + "hulksmash.priority", "3");
+    tc.set(prefix + "hulksmash.prioritizer", "com.foo.ScanPrioritizer");
+    tc.set(prefix + "hulksmash.prioritizer.opts.k1", "v1");
+    tc.set(prefix + "hulksmash.prioritizer.opts.k2", "v3");
+
+    executors = tc.getScanExecutors();
+    Assert.assertEquals(3, executors.size());
+    ScanExecutorConfig sec7 = executors.stream().filter(c -> c.name.equals("hulksmash")).findFirst()
+        .get();
+    Assert.assertEquals(66, sec7.maxThreads);
+    Assert.assertEquals(3, sec7.priority.getAsInt());
+    Assert.assertEquals("com.foo.ScanPrioritizer", sec7.prioritizerClass.get());
+    Assert.assertEquals(ImmutableMap.of("k1", "v1", "k2", "v3"), sec7.prioritizerOpts);
+
+    tc.set(prefix + "hulksmash.threads", "44");
+    Assert.assertEquals(66, sec7.maxThreads);
+    Assert.assertEquals(44, sec7.getCurrentMaxThreads());
+
+    ScanExecutorConfig sec8 = tc.getScanExecutors().stream().filter(c -> c.name.equals("hulksmash"))
+        .findFirst().get();
+    Assert.assertEquals(44, sec8.maxThreads);
+  }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
new file mode 100644
index 0000000..0128b20
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IdleRatioScanPrioritizerTest {
+
+  @Test
+  public void testSort() {
+    long now = System.currentTimeMillis();
+
+    List<TestScanInfo> scans = new ArrayList<>();
+
+    // Two following have never run, so oldest should go first
+    scans.add(new TestScanInfo("a", Type.SINGLE, now - 3));
+    scans.add(new TestScanInfo("b", Type.SINGLE, now - 8));
+    // Two following have different idle ratio and same last run times
+    scans.add(new TestScanInfo("c", Type.SINGLE, now - 16, 2, 10));
+    scans.add(new TestScanInfo("d", Type.SINGLE, now - 16, 5, 10));
+    // Two following have same idle ratio and different last run times
+    scans.add(new TestScanInfo("e", Type.SINGLE, now - 12, 5, 9));
+    scans.add(new TestScanInfo("f", Type.SINGLE, now - 12, 3, 7));
+
+    Collections.shuffle(scans);
+
+    Comparator<ScanInfo> comparator = new IdleRatioScanPrioritizer()
+        .createComparator(Collections.emptyMap());
+
+    Collections.sort(scans, comparator);
+
+    Assert.assertEquals("b", scans.get(0).testId);
+    Assert.assertEquals("a", scans.get(1).testId);
+    Assert.assertEquals("f", scans.get(2).testId);
+    Assert.assertEquals("e", scans.get(3).testId);
+    Assert.assertEquals("d", scans.get(4).testId);
+    Assert.assertEquals("c", scans.get(5).testId);
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
new file mode 100644
index 0000000..b59858a
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SimpleScanDispatcherTest {
+  @Test
+  public void testProps() {
+    Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey()
+        .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".threads"));
+    Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER.getKey()
+        .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".prioritizer"));
+  }
+
+  private void runTest(Map<String,String> opts, String expectedSingle, String expectedMulti) {
+    ScanInfo msi = new TestScanInfo("a", Type.MULTI, 4);
+    ScanInfo ssi = new TestScanInfo("a", Type.SINGLE, 4);
+
+    SimpleScanDispatcher ssd1 = new SimpleScanDispatcher();
+    ssd1.init(opts);
+    Assert.assertEquals(expectedMulti, ssd1.dispatch(msi, null));
+    Assert.assertEquals(expectedSingle, ssd1.dispatch(ssi, null));
+  }
+
+  @Test
+  public void testBasic() {
+    String dname = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+    runTest(Collections.emptyMap(), dname, dname);
+    runTest(ImmutableMap.of("executor", "E1"), "E1", "E1");
+    runTest(ImmutableMap.of("single_executor", "E2"), "E2", dname);
+    runTest(ImmutableMap.of("multi_executor", "E3"), dname, "E3");
+    runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2"), "E2", "E1");
+    runTest(ImmutableMap.of("executor", "E1", "multi_executor", "E3"), "E1", "E3");
+    runTest(ImmutableMap.of("single_executor", "E2", "multi_executor", "E3"), "E2", "E3");
+    runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2", "multi_executor", "E3"),
+        "E2", "E3");
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
new file mode 100644
index 0000000..68a9650
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collection;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+import org.apache.accumulo.core.util.Stat;
+
+public class TestScanInfo implements ScanInfo {
+
+  String testId;
+  Type scanType;
+  long creationTime;
+  OptionalLong lastRunTime = OptionalLong.empty();
+  Stat runTimeStats = new Stat();
+  Stat idleTimeStats = new Stat();
+
+  TestScanInfo(String testId, Type scanType, long creationTime, int... times) {
+    this.testId = testId;
+    this.scanType = scanType;
+    this.creationTime = creationTime;
+
+    for (int i = 0; i < times.length; i += 2) {
+      long idleDuration = times[i] - (i == 0 ? 0 : times[i - 1]);
+      long runDuration = times[i + 1] - times[i];
+      runTimeStats.addStat(runDuration);
+      idleTimeStats.addStat(idleDuration);
+    }
+
+    if (times.length > 0) {
+      lastRunTime = OptionalLong.of(times[times.length - 1] + creationTime);
+    }
+  }
+
+  @Override
+  public Type getScanType() {
+    return scanType;
+  }
+
+  @Override
+  public String getTableId() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  @Override
+  public OptionalLong getLastRunTime() {
+    return lastRunTime;
+  }
+
+  @Override
+  public Stats getRunTimeStats() {
+    return runTimeStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats() {
+    return idleTimeStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats(long currentTime) {
+    Stat copy = idleTimeStats.copy();
+    copy.addStat(currentTime - lastRunTime.orElse(creationTime));
+    return copy;
+  }
+
+  @Override
+  public Set<Column> getFetchedColumns() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<IteratorConfiguration> getClientScanIterators() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
index 69df38d..a2986ac 100644
--- a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
@@ -43,32 +43,26 @@ public class StatTest {
 
   @Test
   public void testGetMin() {
-    assertEquals(0, zero.getMin());
-    assertEquals(3677, stat.getMin());
+    assertEquals(0, zero.min());
+    assertEquals(3677, stat.min());
   }
 
   @Test
   public void testGetMax() {
-    assertEquals(0, zero.getMax());
-    assertEquals(9792, stat.getMax());
+    assertEquals(0, zero.max());
+    assertEquals(9792, stat.max());
   }
 
   @Test
   public void testGetAverage() {
-    assertEquals(0, zero.getAverage(), delta);
-    assertEquals(5529, stat.getAverage(), delta);
-  }
-
-  @Test
-  public void testGetStdDev() {
-    assertEquals(0, zero.getStdDev(), delta);
-    assertEquals(2073.7656569632, stat.getStdDev(), delta);
+    assertEquals(0, zero.mean(), delta);
+    assertEquals(5529, stat.mean(), delta);
   }
 
   @Test
   public void testGetSum() {
-    assertEquals(0, zero.getSum());
-    assertEquals(38703, stat.getSum());
+    assertEquals(0, zero.sum());
+    assertEquals(38703, stat.sum());
   }
 
   @Test
@@ -76,16 +70,14 @@ public class StatTest {
     zero.clear();
     stat.clear();
 
-    assertEquals(0, zero.getMax());
-    assertEquals(zero.getMax(), stat.getMax());
-    assertEquals(0, zero.getMin());
-    assertEquals(zero.getMin(), stat.getMin());
-    assertEquals(0, zero.getSum());
-    assertEquals(zero.getSum(), stat.getSum());
+    assertEquals(0, zero.max());
+    assertEquals(zero.max(), stat.max());
+    assertEquals(0, zero.min());
+    assertEquals(zero.min(), stat.min());
+    assertEquals(0, zero.sum());
+    assertEquals(zero.sum(), stat.sum());
 
-    assertEquals(Double.NaN, zero.getAverage(), 0);
-    assertEquals(zero.getAverage(), stat.getAverage(), 0);
-    assertEquals(Double.NaN, zero.getStdDev(), 0);
-    assertEquals(zero.getStdDev(), stat.getStdDev(), 0);
+    assertEquals(Double.NaN, zero.mean(), 0);
+    assertEquals(zero.mean(), stat.mean(), 0);
   }
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index dbb7ca4..cc67feb 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.conf;
 import static java.util.Objects.requireNonNull;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
@@ -216,4 +218,39 @@ public class TableConfiguration extends ObservableConfiguration {
 
     return pic;
   }
+
+  public static class TablesScanDispatcher {
+    public final ScanDispatcher dispatcher;
+    public final long count;
+
+    public TablesScanDispatcher(ScanDispatcher dispatcher, long count) {
+      this.dispatcher = dispatcher;
+      this.count = count;
+    }
+  }
+
+  private AtomicReference<TablesScanDispatcher> scanDispatcherRef = new AtomicReference<>();
+
+  public ScanDispatcher getScanDispatcher() {
+    long count = getUpdateCount();
+    TablesScanDispatcher currRef = scanDispatcherRef.get();
+    if (currRef == null || currRef.count != count) {
+      ScanDispatcher newDispatcher = Property.createTableInstanceFromPropertyName(this,
+          Property.TABLE_SCAN_DISPATCHER, ScanDispatcher.class, null);
+
+      Map<String,String> opts = new HashMap<>();
+      getAllPropertiesWithPrefix(Property.TABLE_SCAN_DISPATCHER_OPTS).forEach((k, v) -> {
+        String optKey = k.substring(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey().length());
+        opts.put(optKey, v);
+      });
+
+      newDispatcher.init(Collections.unmodifiableMap(opts));
+
+      TablesScanDispatcher newRef = new TablesScanDispatcher(newDispatcher, count);
+      scanDispatcherRef.compareAndSet(currRef, newRef);
+      currRef = newRef;
+    }
+
+    return currRef.dispatcher;
+  }
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
index 1973fd8..ab41141 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
@@ -81,11 +81,12 @@ public class ZooConfiguration extends AccumuloConfiguration {
   @Override
   public String get(Property property) {
     if (Property.isFixedZooPropertyKey(property)) {
-      if (fixedProps.containsKey(property.getKey())) {
-        return fixedProps.get(property.getKey());
+      String val = fixedProps.get(property.getKey());
+      if (val != null) {
+        return val;
       } else {
         synchronized (fixedProps) {
-          String val = _get(property);
+          val = _get(property);
           fixedProps.put(property.getKey(), val);
           return val;
         }
@@ -96,6 +97,12 @@ public class ZooConfiguration extends AccumuloConfiguration {
     }
   }
 
+  @Override
+  public boolean isPropertySet(Property prop) {
+    return fixedProps.containsKey(prop.getKey()) || getRaw(prop.getKey()) != null
+        || parent.isPropertySet(prop);
+  }
+
   private String getRaw(String key) {
     String zPath = propPathPrefix + "/" + key;
     byte[] v = propCache.get(zPath);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6e67cf0..14c0306 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -130,6 +130,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
 import org.apache.accumulo.core.summary.Gatherer;
 import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
 import org.apache.accumulo.core.summary.SummaryCollection;
@@ -250,9 +251,9 @@ import org.apache.accumulo.tserver.scan.NextBatchTask;
 import org.apache.accumulo.tserver.scan.ScanRunState;
 import org.apache.accumulo.tserver.session.ConditionalSession;
 import org.apache.accumulo.tserver.session.MultiScanSession;
-import org.apache.accumulo.tserver.session.ScanSession;
 import org.apache.accumulo.tserver.session.Session;
 import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SingleScanSession;
 import org.apache.accumulo.tserver.session.SummarySession;
 import org.apache.accumulo.tserver.session.UpdateSession;
 import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
@@ -543,6 +544,16 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       }
     }
 
+    private ScanDispatcher getScanDispatcher(KeyExtent extent) {
+      if (extent.isRootTablet() || extent.isMeta()) {
+        // dispatcher is only for user tables
+        return null;
+      }
+
+      return getServerConfigurationFactory().getTableConfiguration(extent.getTableId())
+          .getScanDispatcher();
+    }
+
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
         TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
@@ -587,13 +598,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       if (tablet == null)
         throw new NotServingTabletException(textent);
 
-      Set<Column> columnSet = new HashSet<>();
+      HashSet<Column> columnSet = new HashSet<>();
       for (TColumn tcolumn : columns) {
         columnSet.add(new Column(tcolumn));
       }
 
-      final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio,
-          new Authorizations(authorizations), readaheadThreshold, batchTimeOut, context);
+      final SingleScanSession scanSession = new SingleScanSession(credentials, extent, columnSet,
+          ssiList, ssio, new Authorizations(authorizations), readaheadThreshold, batchTimeOut,
+          context);
       scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet,
           scanSession.auths, ssiList, ssio, isolated, scanSession.interruptFlag,
           SamplerConfigurationImpl.fromThrift(tSamplerConfig), scanSession.batchTimeOut,
@@ -619,7 +631,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
         throws NoSuchScanIDException, NotServingTabletException,
         org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
         TSampleNotPresentException {
-      ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
+      SingleScanSession scanSession = (SingleScanSession) sessionManager.reserveSession(scanID);
       if (scanSession == null) {
         throw new NoSuchScanIDException();
       }
@@ -631,7 +643,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       }
     }
 
-    private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession)
+    private ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession)
         throws NoSuchScanIDException, NotServingTabletException,
         org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
         TSampleNotPresentException {
@@ -639,7 +651,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       if (scanSession.nextBatchTask == null) {
         scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID,
             scanSession.interruptFlag);
-        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+        resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent),
+            scanSession, scanSession.nextBatchTask);
       }
 
       ScanBatch bresult;
@@ -694,7 +707,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
         // to client
         scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID,
             scanSession.interruptFlag);
-        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+        resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent),
+            scanSession, scanSession.nextBatchTask);
       }
 
       if (!scanResult.more)
@@ -705,14 +719,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
 
     @Override
     public void closeScan(TInfo tinfo, long scanID) {
-      final ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
+      final SingleScanSession ss = (SingleScanSession) sessionManager.removeSession(scanID);
       if (ss != null) {
         long t2 = System.currentTimeMillis();
 
         if (log.isTraceEnabled()) {
           log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ",
               TServerUtils.clientAddress.get(), ss.extent.getTableId(), ss.entriesReturned,
-              (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
+              (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
         }
 
         if (scanMetrics.isEnabled()) {
@@ -818,7 +832,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
 
       if (session.lookupTask == null) {
         session.lookupTask = new LookupTask(TabletServer.this, scanID);
-        resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
+        resourceManager.executeReadAhead(session.threadPoolExtent,
+            getScanDispatcher(session.threadPoolExtent), session, session.lookupTask);
       }
 
       try {
@@ -1174,8 +1189,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
             String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)",
                 TServerUtils.clientAddress.get(), us.totalUpdates,
                 (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(),
-                us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
-                us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
+                us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, us.walogTimes.sum() / 1000.0,
+                us.commitTimes.sum() / 1000.0));
       }
       if (us.failures.size() > 0) {
         Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 9a91e87..5ab6e27 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -21,10 +21,15 @@ import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Queue;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
@@ -36,12 +41,13 @@ import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
 
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.NamespaceNotFoundException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
@@ -50,6 +56,11 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheType;
 import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
 import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanExecutor;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
+import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.fate.util.LoggingRunnable;
@@ -67,15 +78,18 @@ import org.apache.accumulo.tserver.compaction.CompactionStrategy;
 import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
 import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
-import org.apache.accumulo.tserver.session.SessionComparator;
+import org.apache.accumulo.tserver.session.ScanSession;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.htrace.wrappers.TraceExecutorService;
+import org.apache.htrace.wrappers.TraceRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 
 /**
  * ResourceManager is responsible for managing the resources of all tablets within a tablet server.
@@ -94,14 +108,13 @@ public class TabletServerResourceManager {
   private final ExecutorService migrationPool;
   private final ExecutorService assignmentPool;
   private final ExecutorService assignMetaDataPool;
-  private final ExecutorService readAheadThreadPool;
-  private final ExecutorService defaultReadAheadThreadPool;
   private final ExecutorService summaryRetrievalPool;
   private final ExecutorService summaryParitionPool;
   private final ExecutorService summaryRemotePool;
   private final Map<String,ExecutorService> threadPools = new TreeMap<>();
 
-  private final Map<String,ExecutorService> tableThreadPools = new TreeMap<>();
+  private final Map<String,ExecutorService> scanExecutors;
+  private final Map<String,ScanExecutor> scanExecutorChoices;
 
   private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;
 
@@ -128,46 +141,15 @@ public class TabletServerResourceManager {
     return tp;
   }
 
-  private ExecutorService addEs(final Property maxThreads, String name,
-      final ThreadPoolExecutor tp) {
+  private ExecutorService addEs(IntSupplier maxThreads, String name, final ThreadPoolExecutor tp) {
     ExecutorService result = addEs(name, tp);
     SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new Runnable() {
       @Override
       public void run() {
         try {
-          int max = tserver.getConfiguration().getCount(maxThreads);
+          int max = maxThreads.getAsInt();
           if (tp.getMaximumPoolSize() != max) {
-            log.info("Changing {} to {}", maxThreads.getKey(), max);
-            tp.setCorePoolSize(max);
-            tp.setMaximumPoolSize(max);
-          }
-        } catch (Throwable t) {
-          log.error("Failed to change thread pool size", t);
-        }
-      }
-
-    }, 1000, 10 * 1000);
-    return result;
-  }
-
-  private ExecutorService addEs(final int maxThreads, final Property prefix,
-      final String propertyName, String name, final ThreadPoolExecutor tp) {
-    ExecutorService result = addEs(name, tp);
-    SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          int max = maxThreads;
-          for (Entry<String,String> entry : conf.getSystemConfiguration()
-              .getAllPropertiesWithPrefix(prefix).entrySet()) {
-            if (entry.getKey().equals(propertyName)) {
-              if (null != entry.getValue() && entry.getValue().length() != 0)
-                max = Integer.parseInt(entry.getValue());
-              break;
-            }
-          }
-          if (tp.getMaximumPoolSize() != max) {
-            log.info("Changing {} to {}", maxThreads, max);
+            log.info("Changing max threads for {} to {}", name, max);
             tp.setCorePoolSize(max);
             tp.setMaximumPoolSize(max);
           }
@@ -187,7 +169,7 @@ public class TabletServerResourceManager {
     ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, timeout, timeUnit, queue,
         new NamingThreadFactory(name));
     tp.allowCoreThreadTimeOut(true);
-    return addEs(max, name, tp);
+    return addEs(() -> conf.getSystemConfiguration().getCount(max), name, tp);
   }
 
   private ExecutorService createEs(int max, String name) {
@@ -198,83 +180,52 @@ public class TabletServerResourceManager {
     return createEs(max, name, new LinkedBlockingQueue<>());
   }
 
-  private ExecutorService createEs(int maxThreads, Property prefix, String propertyName,
-      String name, BlockingQueue<Runnable> queue) {
-    ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
-        TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
-    return addEs(maxThreads, prefix, propertyName, name, tp);
-  }
+  private ExecutorService createPriorityExecutor(ScanExecutorConfig sec,
+      Map<String,Queue<?>> scanExecQueues) {
 
-  /**
-   * If we cannot instantiate the comparator we will default to the linked blocking queue comparator
-   * 
-   * @param max
-   *          max number of threads
-   * @param comparator
-   *          comparator property
-   * @param name
-   *          name passed to the thread factory
-   * @return priority executor
-   */
-  private ExecutorService createPriorityExecutor(Property prefix, String propertyName,
-      final int maxThreads, Property comparator, String name) {
-
-    String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+    BlockingQueue<Runnable> queue;
 
-    if (null == comparatorClazz || comparatorClazz.length() == 0) {
-      log.debug("Using no comparator");
-      return createEs(maxThreads, prefix, propertyName, name, new LinkedBlockingQueue<>());
+    if (sec.prioritizerClass.orElse("").isEmpty()) {
+      queue = new LinkedBlockingQueue<>();
     } else {
-      SessionComparator comparatorObj = Property.createInstanceFromPropertyName(
-          conf.getSystemConfiguration(), comparator, SessionComparator.class, null);
-      if (null != comparatorObj) {
-        log.debug("Using priority based scheduler {}", comparatorClazz);
-        return createEs(maxThreads, prefix, propertyName, name,
-            new PriorityBlockingQueue<>(maxThreads, comparatorObj));
-      } else {
-        log.debug("Using no comparator");
-        return createEs(maxThreads, prefix, propertyName, name, new LinkedBlockingQueue<>());
+      ScanPrioritizer factory = null;
+      try {
+        factory = ConfigurationTypeHelper.getClassInstance(null, sec.prioritizerClass.get(),
+            ScanPrioritizer.class);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
-    }
-  }
 
-  /**
-   * If we cannot instantiate the comparator we will default to the linked blocking queue comparator
-   * 
-   * @param max
-   *          max number of threads
-   * @param comparator
-   *          comparator property
-   * @param name
-   *          name passed to the thread factory
-   * @return priority executor
-   */
-  private ExecutorService createPriorityExecutor(Property max, Property comparator, String name) {
-    int maxThreads = conf.getSystemConfiguration().getCount(max);
+      if (factory == null) {
+        queue = new LinkedBlockingQueue<>();
+      } else {
+        Comparator<ScanInfo> comparator = factory.createComparator(sec.prioritizerOpts);
 
-    String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+        // function to extract scan scan session from runnable
+        Function<Runnable,ScanInfo> extractor = r -> ((ScanSession.ScanMeasurer) ((TraceRunnable) r)
+            .getRunnable()).getScanInfo();
 
-    if (null == comparatorClazz || comparatorClazz.length() == 0) {
-      log.debug("Using no comparator");
-      return createEs(max, name, new LinkedBlockingQueue<>());
-    } else {
-      SessionComparator comparatorObj = Property.createInstanceFromPropertyName(
-          conf.getSystemConfiguration(), comparator, SessionComparator.class, null);
-      if (null != comparatorObj) {
-        log.debug("Using priority based scheduler {}", comparatorClazz);
-        return createEs(max, name, new PriorityBlockingQueue<>(maxThreads, comparatorObj));
-      } else {
-        log.debug("Using no comparator");
-        return createEs(max, name, new LinkedBlockingQueue<>());
+        queue = new PriorityBlockingQueue<>(sec.maxThreads,
+            Comparator.comparing(extractor, comparator));
       }
     }
+
+    scanExecQueues.put(sec.name, queue);
+
+    return createEs(() -> sec.getCurrentMaxThreads(), "scan-" + sec.name, queue, sec.priority);
   }
 
-  private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
-    int maxThreads = conf.getSystemConfiguration().getCount(max);
+  private ExecutorService createEs(IntSupplier maxThreadsSupplier, String name,
+      BlockingQueue<Runnable> queue, OptionalInt priority) {
+    int maxThreads = maxThreadsSupplier.getAsInt();
     ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
-        TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
-    return addEs(max, name, tp);
+        TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name, priority));
+    return addEs(maxThreadsSupplier, name, tp);
+  }
+
+  private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
+    IntSupplier maxThreadsSupplier = () -> conf.getSystemConfiguration().getCount(max);
+    return createEs(maxThreadsSupplier, name, queue, OptionalInt.empty());
   }
 
   private ExecutorService createEs(int min, int max, int timeout, String name) {
@@ -282,33 +233,78 @@ public class TabletServerResourceManager {
         new LinkedBlockingQueue<>(), new NamingThreadFactory(name)));
   }
 
-  /**
-   * Creates table specific thread pool for executing scan threads
-   * 
-   * @param instance
-   *          ZK instance.
-   * @param acuConf
-   *          accumulo configuration.
-   * @throws NamespaceNotFoundException
-   *           Error thrown by tables.getTableId when a name space is not found.
-   * @throws TableNotFoundException
-   *           Error thrown by tables.getTableId when a table is not found.
-   */
-  protected void createTablePools(Instance instance, AccumuloConfiguration acuConf)
-      throws NamespaceNotFoundException, TableNotFoundException {
-    for (Entry<String,String> entry : acuConf
-        .getAllPropertiesWithPrefix(Property.TSERV_READ_AHEAD_PREFIX).entrySet()) {
-      final String tableName = entry.getKey()
-          .substring(Property.TSERV_READ_AHEAD_PREFIX.getKey().length());
-      if (null == entry.getValue() || entry.getValue().length() == 0) {
-        throw new RuntimeException("Read ahead prefix is inproperly configured");
+  protected Map<String,ExecutorService> createScanExecutors(Instance instance,
+      Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>> scanExecQueues) {
+    Builder<String,ExecutorService> builder = ImmutableMap.builder();
+
+    for (ScanExecutorConfig sec : scanExecCfg) {
+      builder.put(sec.name, createPriorityExecutor(sec, scanExecQueues));
+    }
+
+    return builder.build();
+  }
+
+  private static class ScanExecutorImpl implements ScanExecutor {
+
+    private static class ConfigImpl implements ScanExecutor.Config {
+
+      final ScanExecutorConfig cfg;
+
+      public ConfigImpl(ScanExecutorConfig sec) {
+        this.cfg = sec;
+      }
+
+      @Override
+      public String getName() {
+        return cfg.name;
       }
-      final int maxThreads = Integer.parseInt(entry.getValue());
-      final String tableId = Tables.getTableId(instance, tableName).canonicalID();
-      tableThreadPools.put(tableId,
-          createPriorityExecutor(Property.TSERV_READ_AHEAD_PREFIX, entry.getKey(), maxThreads,
-              Property.TSERV_SESSION_COMPARATOR_CLASS, tableName + " specific read ahead"));
+
+      @Override
+      public int getMaxThreads() {
+        return cfg.maxThreads;
+      }
+
+      @Override
+      public Optional<String> getPrioritizerClass() {
+        return cfg.prioritizerClass;
+      }
+
+      @Override
+      public Map<String,String> getPrioritizerOptions() {
+        return cfg.prioritizerOpts;
+      }
+
     }
+
+    private final ConfigImpl config;
+    private final Queue<?> queue;
+
+    ScanExecutorImpl(ScanExecutorConfig sec, Queue<?> q) {
+      this.config = new ConfigImpl(sec);
+      this.queue = q;
+    }
+
+    @Override
+    public int getQueued() {
+      return queue.size();
+    }
+
+    @Override
+    public Config getConfig() {
+      return config;
+    }
+
+  }
+
+  private Map<String,ScanExecutor> createScanExecutorChoices(
+      Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>> scanExecQueues) {
+    Builder<String,ScanExecutor> builder = ImmutableMap.builder();
+
+    for (ScanExecutorConfig sec : scanExecCfg) {
+      builder.put(sec.name, new ScanExecutorImpl(sec, scanExecQueues.get(sec.name)));
+    }
+
+    return builder.build();
   }
 
   public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
@@ -390,11 +386,6 @@ public class TabletServerResourceManager {
 
     activeAssignments = new ConcurrentHashMap<>();
 
-    readAheadThreadPool = createPriorityExecutor(Property.TSERV_READ_AHEAD_MAXCONCURRENT,
-        Property.TSERV_SESSION_COMPARATOR_CLASS, "tablet read ahead");
-    defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
-        "metadata tablets read ahead");
-
     summaryRetrievalPool = createIdlingEs(Property.TSERV_SUMMARY_RETRIEVAL_THREADS,
         "summary file retriever", 60, TimeUnit.SECONDS);
     summaryRemotePool = createIdlingEs(Property.TSERV_SUMMARY_REMOTE_THREADS, "summary remote", 60,
@@ -402,13 +393,11 @@ public class TabletServerResourceManager {
     summaryParitionPool = createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS,
         "summary partition", 60, TimeUnit.SECONDS);
 
-    try {
-      createTablePools(tserver.getInstance(), acuConf);
-    } catch (NamespaceNotFoundException e) {
-      throw new RuntimeException(e);
-    } catch (TableNotFoundException e) {
-      throw new RuntimeException(e);
-    }
+    Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors();
+    Map<String,Queue<?>> scanExecQueues = new HashMap<>();
+    scanExecutors = createScanExecutors(tserver.getInstance(), scanExecCfg, scanExecQueues);
+    scanExecutorChoices = createScanExecutorChoices(scanExecCfg, scanExecQueues);
+
     int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
 
     Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
@@ -938,16 +927,29 @@ public class TabletServerResourceManager {
     }
   }
 
-  public void executeReadAhead(KeyExtent tablet, Runnable task) {
-    ExecutorService service = tableThreadPools.get(tablet.getTableId().canonicalID());
-    if (null != service) {
-      service.execute(task);
-    } else if (tablet.isRootTablet()) {
+  public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher, ScanSession scanInfo,
+      Runnable task) {
+
+    task = ScanSession.wrap(scanInfo, task);
+
+    if (tablet.isRootTablet()) {
       task.run();
     } else if (tablet.isMeta()) {
-      defaultReadAheadThreadPool.execute(task);
+      scanExecutors.get("meta").execute(task);
     } else {
-      readAheadThreadPool.execute(task);
+      String scanExecutorName = dispatcher.dispatch(scanInfo, scanExecutorChoices);
+      ExecutorService executor = scanExecutors.get(scanExecutorName);
+      if (executor == null) {
+        log.warn(
+            "For table id {}, {} dispatched to non-existant executor {} Using default executor.",
+            tablet.getTableId(), dispatcher.getClass().getName(), scanExecutorName);
+        executor = scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
+      } else if ("meta".equals(scanExecutorName)) {
+        log.warn("For table id {}, {} dispatched to meta executor. Using default executor.",
+            tablet.getTableId(), dispatcher.getClass().getName());
+        executor = scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
+      }
+      executor.execute(task);
     }
   }
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
index aee4477..16b9c55 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -23,7 +23,7 @@ import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.TooManyFilesException;
-import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.SingleScanSession;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.accumulo.tserver.tablet.TabletClosedException;
@@ -48,7 +48,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> {
   @Override
   public void run() {
 
-    final ScanSession scanSession = (ScanSession) server.getSession(scanID);
+    final SingleScanSession scanSession = (SingleScanSession) server.getSession(scanID);
     String oldThreadName = Thread.currentThread().getName();
 
     try {
@@ -69,10 +69,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> {
         return;
       }
 
-      long t1 = System.currentTimeMillis();
       ScanBatch batch = scanSession.scanner.read();
-      long t2 = System.currentTimeMillis();
-      scanSession.nbTimes.addStat(t2 - t1);
 
       // there should only be one thing on the queue at a time, so
       // it should be ok to call add()
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
deleted file mode 100644
index d584242..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.session;
-
-public class DefaultSessionComparator extends SessionComparator {
-
-  @Override
-  public int compareSession(Session sessionA, Session sessionB) {
-
-    final long startTimeFirst = sessionA.startTime;
-    final long startTimeSecond = sessionB.startTime;
-
-    // use the lowest max idle time
-    final long maxIdle = sessionA.maxIdleAccessTime < sessionB.maxIdleAccessTime
-        ? sessionA.maxIdleAccessTime
-        : sessionB.maxIdleAccessTime;
-
-    final long currentTime = System.currentTimeMillis();
-
-    /*
-     * Multiply by -1 so that we have a sensical comparison. This means that if comparison < 0,
-     * sessionA is newer. If comparison > 0, this means that session B is newer
-     */
-    int comparison = -1 * Long.compare(startTimeFirst, startTimeSecond);
-
-    if (!(sessionA.lastExecTime == -1 && sessionB.lastExecTime == -1)) {
-      if (comparison >= 0) {
-        long idleTimeA = currentTime - sessionA.lastExecTime;
-
-        /*
-         * If session B is newer, let's make sure that we haven't reached the max idle time, where
-         * we have to begin aging A
-         */
-        if (idleTimeA > sessionA.maxIdleAccessTime) {
-          comparison = -1 * Long.valueOf(idleTimeA - maxIdle).intValue();
-        }
-      } else {
-        long idleTimeB = currentTime - sessionB.lastExecTime;
-
-        /*
-         * If session A is newer, let's make sure that B hasn't reached the max idle time, where we
-         * have to begin aging A
-         */
-        if (idleTimeB > sessionA.maxIdleAccessTime) {
-          comparison = 1 * Long.valueOf(idleTimeB - maxIdle).intValue();
-        }
-      }
-    }
-
-    return comparison;
-  }
-
-}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index 981832a..f8db1f4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -21,7 +21,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.IterInfo;
@@ -30,13 +29,9 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.tserver.scan.ScanTask;
 
-public class MultiScanSession extends Session {
+public class MultiScanSession extends ScanSession {
   public final KeyExtent threadPoolExtent;
-  public final HashSet<Column> columnSet = new HashSet<>();
   public final Map<KeyExtent,List<Range>> queries;
-  public final List<IterInfo> ssiList;
-  public final Map<String,Map<String,String>> ssio;
-  public final Authorizations auths;
   public final SamplerConfiguration samplerConfig;
   public final long batchTimeOut;
   public final String context;
@@ -53,11 +48,8 @@ public class MultiScanSession extends Session {
       Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList,
       Map<String,Map<String,String>> ssio, Authorizations authorizations,
       SamplerConfiguration samplerConfig, long batchTimeOut, String context) {
-    super(credentials);
+    super(credentials, new HashSet<>(), ssiList, ssio, authorizations);
     this.queries = queries;
-    this.ssiList = ssiList;
-    this.ssio = ssio;
-    this.auths = authorizations;
     this.threadPoolExtent = threadPoolExtent;
     this.samplerConfig = samplerConfig;
     this.batchTimeOut = batchTimeOut;
@@ -65,20 +57,20 @@ public class MultiScanSession extends Session {
   }
 
   @Override
+  public Type getScanType() {
+    return Type.MULTI;
+  }
+
+  @Override
+  public String getTableId() {
+    return threadPoolExtent.getTableId().canonicalID();
+  }
+
+  @Override
   public boolean cleanup() {
     if (lookupTask != null)
       lookupTask.cancel(true);
     // the cancellation should provide us the safety to return true here
     return true;
   }
-
-  /**
-   * Ensure that the runnable actually runs
-   */
-  @Override
-  public void run() {
-    super.run();
-    lookupTask.run();
-  }
-
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index a55de1e..5cde06c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -16,64 +16,148 @@
  */
 package org.apache.accumulo.tserver.session;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.data.Column;
-import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
 import org.apache.accumulo.core.util.Stat;
-import org.apache.accumulo.tserver.scan.ScanTask;
-import org.apache.accumulo.tserver.tablet.ScanBatch;
-import org.apache.accumulo.tserver.tablet.Scanner;
-
-public class ScanSession extends Session {
-  public final Stat nbTimes = new Stat();
-  public final KeyExtent extent;
-  public final Set<Column> columnSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public abstract class ScanSession extends Session implements ScanInfo {
+
+  public static class ScanMeasurer implements Runnable {
+
+    private ScanSession session;
+    private Runnable task;
+
+    ScanMeasurer(ScanSession session, Runnable task) {
+      this.session = session;
+      this.task = task;
+    }
+
+    @Override
+    public void run() {
+      long t1 = System.currentTimeMillis();
+      task.run();
+      long t2 = System.currentTimeMillis();
+      session.finishedRun(t1, t2);
+    }
+
+    public ScanInfo getScanInfo() {
+      return session;
+    }
+  }
+
+  public static ScanMeasurer wrap(ScanSession scanInfo, Runnable r) {
+    return new ScanMeasurer(scanInfo, r);
+  }
+
+  private OptionalLong lastRunTime = OptionalLong.empty();
+  private Stat idleStats = new Stat();
+  public Stat runStats = new Stat();
+
+  public final HashSet<Column> columnSet;
   public final List<IterInfo> ssiList;
   public final Map<String,Map<String,String>> ssio;
   public final Authorizations auths;
-  public final AtomicBoolean interruptFlag = new AtomicBoolean();
-  public long entriesReturned = 0;
-  public long batchCount = 0;
-  public volatile ScanTask<ScanBatch> nextBatchTask;
-  public Scanner scanner;
-  public final long readaheadThreshold;
-  public final long batchTimeOut;
-  public final String context;
-
-  public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet,
-      List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, Authorizations authorizations,
-      long readaheadThreshold, long batchTimeOut, String context) {
+
+  ScanSession(TCredentials credentials, HashSet<Column> cols, List<IterInfo> ssiList,
+      Map<String,Map<String,String>> ssio, Authorizations auths) {
     super(credentials);
-    this.extent = extent;
-    this.columnSet = columnSet;
+    this.columnSet = cols;
     this.ssiList = ssiList;
     this.ssio = ssio;
-    this.auths = authorizations;
-    this.readaheadThreshold = readaheadThreshold;
-    this.batchTimeOut = batchTimeOut;
-    this.context = context;
+    this.auths = auths;
+  }
+
+  @Override
+  public long getCreationTime() {
+    return startTime;
+  }
+
+  @Override
+  public OptionalLong getLastRunTime() {
+    return lastRunTime;
   }
 
   @Override
-  public boolean cleanup() {
-    final boolean ret;
-    try {
-      if (nextBatchTask != null)
-        nextBatchTask.cancel(true);
-    } finally {
-      if (scanner != null)
-        ret = scanner.close();
-      else
-        ret = true;
+  public Stats getRunTimeStats() {
+    return runStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats() {
+    return idleStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats(long currentTime) {
+    long idleTime = currentTime - getLastRunTime().orElse(getCreationTime());
+    Preconditions.checkArgument(idleTime >= 0);
+    Stat copy = idleStats.copy();
+    copy.addStat(idleTime);
+    return copy;
+  }
+
+  @Override
+  public Set<Column> getFetchedColumns() {
+    return Collections.unmodifiableSet(columnSet);
+  }
+
+  private class IterConfImpl implements IteratorConfiguration {
+
+    private IterInfo ii;
+
+    IterConfImpl(IterInfo ii) {
+      this.ii = ii;
+    }
+
+    @Override
+    public String getIteratorClass() {
+      return ii.className;
+    }
+
+    @Override
+    public String getName() {
+      return ii.iterName;
+    }
+
+    @Override
+    public int getPriority() {
+      return ii.priority;
+    }
+
+    @Override
+    public Map<String,String> getOptions() {
+      Map<String,String> opts = ssio.get(ii.iterName);
+      return opts == null || opts.isEmpty() ? Collections.emptyMap()
+          : Collections.unmodifiableMap(opts);
     }
-    return ret;
   }
 
+  @Override
+  public Collection<IteratorConfiguration> getClientScanIterators() {
+    return Lists.transform(ssiList, IterConfImpl::new);
+  }
+
+  public void finishedRun(long start, long finish) {
+    long idleTime = start - getLastRunTime().orElse(getCreationTime());
+    long runTime = finish - start;
+    lastRunTime = OptionalLong.of(finish);
+    idleStats.addStat(idleTime);
+    runStats.addStat(runTime);
+  }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index 32f7e34..6bd52e4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -19,7 +19,7 @@ package org.apache.accumulo.tserver.session;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.rpc.TServerUtils;
 
-public abstract class Session implements Runnable {
+public class Session {
 
   enum State {
     NEW, UNRESERVED, RESERVED, REMOVED
@@ -27,9 +27,7 @@ public abstract class Session implements Runnable {
 
   public final String client;
   public long lastAccessTime;
-  protected volatile long lastExecTime = -1;
   public long startTime;
-  public long maxIdleAccessTime;
   State state = State.NEW;
   private final TCredentials credentials;
 
@@ -49,18 +47,4 @@ public abstract class Session implements Runnable {
   public boolean cleanup() {
     return true;
   }
-
-  @Override
-  public void run() {
-    lastExecTime = System.currentTimeMillis();
-  }
-
-  public void setLastExecutionTime(long lastExecTime) {
-    this.lastExecTime = lastExecTime;
-  }
-
-  public long getLastExecutionTime() {
-    return lastExecTime;
-  }
-
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index c485698..1ba2491 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -231,9 +231,6 @@ public class SessionManager {
             configuredIdle = maxUpdateIdle;
           }
           long idleTime = System.currentTimeMillis() - session.lastAccessTime;
-          if (idleTime > session.maxIdleAccessTime) {
-            session.maxIdleAccessTime = idleTime;
-          }
           if (idleTime > configuredIdle) {
             log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(),
                 session.client, idleTime);
@@ -318,8 +315,8 @@ public class SessionManager {
       ScanTask nbt = null;
       Table.ID tableID = null;
 
-      if (session instanceof ScanSession) {
-        ScanSession ss = (ScanSession) session;
+      if (session instanceof SingleScanSession) {
+        SingleScanSession ss = (SingleScanSession) session;
         nbt = ss.nextBatchTask;
         tableID = ss.extent.getTableId();
       } else if (session instanceof MultiScanSession) {
@@ -365,8 +362,8 @@ public class SessionManager {
 
     for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) {
       Session session = entry.getValue();
-      if (session instanceof ScanSession) {
-        ScanSession ss = (ScanSession) session;
+      if (session instanceof SingleScanSession) {
+        SingleScanSession ss = (SingleScanSession) session;
 
         ScanState state = ScanState.RUNNING;
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
similarity index 80%
copy from server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
copy to server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
index a55de1e..fb3e29f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
@@ -16,9 +16,9 @@
  */
 package org.apache.accumulo.tserver.session;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.data.Column;
@@ -26,18 +26,12 @@ import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.Stat;
 import org.apache.accumulo.tserver.scan.ScanTask;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Scanner;
 
-public class ScanSession extends Session {
-  public final Stat nbTimes = new Stat();
+public class SingleScanSession extends ScanSession {
   public final KeyExtent extent;
-  public final Set<Column> columnSet;
-  public final List<IterInfo> ssiList;
-  public final Map<String,Map<String,String>> ssio;
-  public final Authorizations auths;
   public final AtomicBoolean interruptFlag = new AtomicBoolean();
   public long entriesReturned = 0;
   public long batchCount = 0;
@@ -47,21 +41,27 @@ public class ScanSession extends Session {
   public final long batchTimeOut;
   public final String context;
 
-  public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet,
+  public SingleScanSession(TCredentials credentials, KeyExtent extent, HashSet<Column> columnSet,
       List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, Authorizations authorizations,
       long readaheadThreshold, long batchTimeOut, String context) {
-    super(credentials);
+    super(credentials, columnSet, ssiList, ssio, authorizations);
     this.extent = extent;
-    this.columnSet = columnSet;
-    this.ssiList = ssiList;
-    this.ssio = ssio;
-    this.auths = authorizations;
     this.readaheadThreshold = readaheadThreshold;
     this.batchTimeOut = batchTimeOut;
     this.context = context;
   }
 
   @Override
+  public Type getScanType() {
+    return Type.SINGLE;
+  }
+
+  @Override
+  public String getTableId() {
+    return extent.getTableId().canonicalID();
+  }
+
+  @Override
   public boolean cleanup() {
     final boolean ret;
     try {
@@ -75,5 +75,4 @@ public class ScanSession extends Session {
     }
     return ret;
   }
-
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 18da8a4..dc5d684 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1540,7 +1540,8 @@ public class Tablet implements TabletCommitter {
     try {
       getTabletResources().executeMajorCompaction(getExtent(), new CompactionRunner(this, reason));
     } catch (RuntimeException t) {
-      log.debug("removing {} because we encountered an exception enqueing the CompactionRunner", reason, t);
+      log.debug("removing {} because we encountered an exception enqueing the CompactionRunner",
+          reason, t);
       majorCompactionQueued.remove(reason);
       throw t;
     }
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
deleted file mode 100644
index f7cc5cd..0000000
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.session;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-public class SessionComparatorTest {
-
-  @Test
-  public void testSingleScanMultiScanNoRun() {
-    long time = System.currentTimeMillis();
-    ScanSession sessionA = emptyScanSession();
-    sessionA.lastAccessTime = 0;
-    sessionA.maxIdleAccessTime = 0;
-    sessionA.startTime = time - 1000;
-
-    MultiScanSession sessionB = emptyMultiScanSession();
-    sessionB.lastAccessTime = 0;
-    sessionB.maxIdleAccessTime = 1000;
-    sessionB.startTime = time - 800;
-
-    ScanSession sessionC = emptyScanSession();
-    sessionC.lastAccessTime = 0;
-    sessionC.maxIdleAccessTime = 1000;
-    sessionC.startTime = time - 800;
-
-    // a has never run, so it should be given priority
-    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
-    assertEquals(-1, comparator.compareSession(sessionA, sessionB));
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    assertEquals(1, comparator.compareSession(sessionB, sessionA));
-
-    // now let's assume they have been executed
-
-    assertEquals(1, comparator.compareSession(sessionA, sessionC));
-
-    assertEquals(0, comparator.compareSession(sessionC, sessionC));
-
-  }
-
-  @Test
-  public void testSingleScanRun() {
-    long time = System.currentTimeMillis();
-    ScanSession sessionA = emptyScanSession();
-    sessionA.lastAccessTime = 0;
-    sessionA.setLastExecutionTime(time);
-    sessionA.maxIdleAccessTime = 1000;
-    sessionA.startTime = time - 1000;
-
-    ScanSession sessionB = emptyScanSession();
-    sessionB.lastAccessTime = 0;
-    sessionB.setLastExecutionTime(time - 2000);
-    sessionB.maxIdleAccessTime = 1000;
-    sessionB.startTime = time - 800;
-
-    // b is newer
-    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
-    assertEquals(1, comparator.compareSession(sessionA, sessionB));
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
-
-    sessionB.setLastExecutionTime(time);
-    sessionA.setLastExecutionTime(time - 2000);
-
-    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    int comp = comparator.compareSession(sessionB, sessionA);
-    assertTrue("comparison is " + comp, comp >= 1);
-  }
-
-  @Test
-  public void testSingleScanMultiScanRun() {
-    long time = System.currentTimeMillis();
-    ScanSession sessionA = emptyScanSession();
-    sessionA.lastAccessTime = 0;
-    sessionA.setLastExecutionTime(time);
-    sessionA.maxIdleAccessTime = 1000;
-    sessionA.startTime = time - 1000;
-
-    MultiScanSession sessionB = emptyMultiScanSession();
-    sessionB.lastAccessTime = 0;
-    sessionB.setLastExecutionTime(time - 2000);
-    sessionB.maxIdleAccessTime = 1000;
-    sessionB.startTime = time - 800;
-
-    // b is newer
-    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
-    assertEquals(-1, comparator.compareSession(sessionA, sessionB));
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    assertTrue(comparator.compareSession(sessionB, sessionA) > 0);
-
-    sessionB.setLastExecutionTime(time);
-    sessionA.setLastExecutionTime(time - 2000);
-
-    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    int comp = comparator.compareSession(sessionB, sessionA);
-    assertTrue("comparison is " + comp, comp > 0);
-  }
-
-  @Test
-  public void testMultiScanRun() {
-    long time = System.currentTimeMillis();
-    ScanSession sessionA = emptyScanSession();
-    sessionA.lastAccessTime = 0;
-    sessionA.setLastExecutionTime(time);
-    sessionA.maxIdleAccessTime = 1000;
-    sessionA.startTime = time - 1000;
-
-    ScanSession sessionB = emptyScanSession();
-    sessionB.lastAccessTime = 0;
-    sessionB.setLastExecutionTime(time - 2000);
-    sessionB.maxIdleAccessTime = 1000;
-    sessionB.startTime = time - 800;
-
-    // b is newer
-    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
-    assertEquals(1, comparator.compareSession(sessionA, sessionB));
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
-
-    sessionB.setLastExecutionTime(time);
-    sessionA.setLastExecutionTime(time - 2000);
-
-    
-    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
-
-    // b is before a in queue, b has never run, but because a is single
-    // we should be given priority
-    int comp = comparator.compareSession(sessionB, sessionA);
-    assertTrue("comparison is " + comp, comp >= 1);
-  }
-
-  private static ScanSession emptyScanSession() {
-    return new ScanSession(null, null, null, null, null, null, 0, 0, null);
-  }
-
-  private static MultiScanSession emptyMultiScanSession() {
-    return new MultiScanSession(null, null, null, null, null, null, null, 0, null);
-  }
-}
diff --git a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
index e807dfb..320b9c2 100644
--- a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
+++ b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
@@ -68,7 +68,7 @@ public class IMMLGBenchmark {
     }
 
     for (Entry<String,Stat> entry : stats.entrySet()) {
-      System.out.printf("%20s : %6.2f\n", entry.getKey(), entry.getValue().getAverage());
+      System.out.printf("%20s : %6.2f\n", entry.getKey(), entry.getValue().mean());
     }
 
   }
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index cbeb1e4..907f2c6 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -606,9 +606,8 @@ public class CollectTabletStats {
   }
 
   private static void printStat(String desc, Stat s) {
-    System.out.printf(
-        "\t\tDescription: [%30s]  average: %,6.2f  std dev: %,6.2f  min: %,d  max: %,d %n", desc,
-        s.getAverage(), s.getStdDev(), s.getMin(), s.getMax());
+    System.out.printf("\t\tDescription: [%30s]  average: %,6.2f min: %,d  max: %,d %n", desc,
+        s.mean(), s.min(), s.max());
 
   }