You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/03/20 14:49:05 UTC

[9/9] accumulo git commit: ACCUMULO-4501 ACCUMULO-96 Added Summarization

ACCUMULO-4501 ACCUMULO-96 Added Summarization

closes apache/accumulo#224
closes apache/accumulo#168


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/94cdcc4d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/94cdcc4d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/94cdcc4d

Branch: refs/heads/master
Commit: 94cdcc4d3f0a8ccf95894f206cb71e6117f4e51d
Parents: 68ba2ef
Author: Keith Turner <kt...@apache.org>
Authored: Mon Mar 20 10:47:00 2017 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Mar 20 10:47:00 2017 -0400

----------------------------------------------------------------------
 .../client/admin/NewTableConfiguration.java     |   49 +-
 .../core/client/admin/SummaryRetriever.java     |  112 +
 .../core/client/admin/TableOperations.java      |   65 +
 .../accumulo/core/client/impl/ServerClient.java |   36 +-
 .../core/client/impl/TableOperationsImpl.java   |  151 +-
 .../client/mapred/AccumuloFileOutputFormat.java |   16 +
 .../mapreduce/AccumuloFileOutputFormat.java     |   16 +
 .../lib/impl/FileOutputConfigurator.java        |    9 +
 .../core/client/mock/MockTableOperations.java   |   27 +
 .../accumulo/core/client/rfile/RFile.java       |  129 +
 .../core/client/rfile/RFileScannerBuilder.java  |    2 +
 .../client/rfile/RFileSummariesRetriever.java   |  122 +
 .../accumulo/core/client/rfile/RFileWriter.java |   26 +
 .../core/client/rfile/RFileWriterBuilder.java   |   38 +-
 .../core/client/summary/CounterSummary.java     |  123 +
 .../core/client/summary/CountingSummarizer.java |  302 +
 .../core/client/summary/Summarizer.java         |  227 +
 .../client/summary/SummarizerConfiguration.java |  285 +
 .../accumulo/core/client/summary/Summary.java   |  145 +
 .../summary/summarizers/DeletesSummarizer.java  |   75 +
 .../summary/summarizers/FamilySummarizer.java   |   46 +
 .../summarizers/VisibilitySummarizer.java       |   47 +
 .../core/compaction/CompactionSettings.java     |    2 +
 .../org/apache/accumulo/core/conf/Property.java |   22 +-
 .../apache/accumulo/core/conf/PropertyType.java |    9 +-
 .../accumulo/core/data/ArrayByteSequence.java   |   18 +
 .../accumulo/core/data/thrift/TRowRange.java    |  521 ++
 .../accumulo/core/data/thrift/TSummaries.java   |  831 +++
 .../data/thrift/TSummarizerConfiguration.java   |  649 ++
 .../accumulo/core/data/thrift/TSummary.java     |  842 +++
 .../core/data/thrift/TSummaryRequest.java       |  760 +++
 .../accumulo/core/file/BloomFilterLayer.java    |   10 +-
 .../core/file/DispatchingFileFactory.java       |    7 +-
 .../accumulo/core/file/FileOperations.java      |   18 +
 .../accumulo/core/file/rfile/PrintInfo.java     |    7 +
 .../core/file/rfile/RFileOperations.java        |    2 +-
 .../core/metadata/schema/MetadataScanner.java   |  236 +
 .../core/metadata/schema/TabletMetadata.java    |  182 +
 .../sample/impl/SamplerConfigurationImpl.java   |   12 -
 .../core/sample/impl/SamplerFactory.java        |    8 +-
 .../accumulo/core/security/TablePermission.java |    5 +-
 .../apache/accumulo/core/summary/Gatherer.java  |  631 ++
 .../summary/SummarizerConfigurationUtil.java    |  128 +
 .../core/summary/SummarizerFactory.java         |   63 +
 .../core/summary/SummaryCollection.java         |  188 +
 .../accumulo/core/summary/SummaryInfo.java      |   53 +
 .../accumulo/core/summary/SummaryReader.java    |  257 +
 .../core/summary/SummarySerializer.java         |  542 ++
 .../accumulo/core/summary/SummaryWriter.java    |  157 +
 .../thrift/TabletClientService.java             | 5642 +++++++++++++++++-
 .../accumulo/core/util/CancelFlagFuture.java    |   67 +
 .../core/util/CompletableFutureUtil.java        |   49 +
 core/src/main/thrift/data.thrift                |   34 +
 core/src/main/thrift/tabletserver.thrift        |    5 +
 .../client/impl/TableOperationsHelperTest.java  |   26 +
 .../mapred/AccumuloFileOutputFormatTest.java    |   18 +
 .../mapreduce/AccumuloFileOutputFormatTest.java |   18 +
 .../accumulo/core/client/rfile/RFileTest.java   |  158 +-
 .../client/summary/CountingSummarizerTest.java  |  259 +
 .../core/summary/SummaryCollectionTest.java     |   72 +
 .../core/util/CompletableFutureUtilTest.java    |   53 +
 .../main/asciidoc/accumulo_user_manual.asciidoc |    2 +
 docs/src/main/asciidoc/chapters/summaries.txt   |  232 +
 .../standalone/StandaloneAccumuloCluster.java   |    3 +-
 .../standalone/StandaloneClusterControl.java    |    3 +-
 .../impl/MiniAccumuloConfigImpl.java            |    3 +-
 .../server/security/SecurityOperation.java      |    5 +
 .../apache/accumulo/tserver/TabletServer.java   |  115 +
 .../tserver/TabletServerResourceManager.java    |   49 +-
 .../tserver/compaction/CompactionStrategy.java  |    1 -
 .../compaction/MajorCompactionRequest.java      |   84 +-
 .../ConfigurableCompactionStrategy.java         |   99 +-
 .../TooManyDeletesCompactionStrategy.java       |  173 +
 .../tserver/session/SummarySession.java         |   42 +
 .../apache/accumulo/tserver/tablet/Tablet.java  |    9 +-
 .../DefaultCompactionStrategyTest.java          |    2 +-
 .../SizeLimitCompactionStrategyTest.java        |    2 +-
 .../TwoTierCompactionStrategyTest.java          |    6 +-
 .../ConfigurableCompactionStrategyTest.java     |    2 +-
 .../java/org/apache/accumulo/shell/Shell.java   |    4 +-
 .../accumulo/shell/commands/CompactCommand.java |    8 +-
 .../shell/commands/SummariesCommand.java        |  115 +
 .../org/apache/accumulo/test/ShellServerIT.java |  129 +-
 .../test/functional/BasicSummarizer.java        |   80 +
 .../accumulo/test/functional/SummaryIT.java     |  820 +++
 .../test/functional/TooManyDeletesIT.java       |  121 +
 .../test/performance/thrift/NullTserver.java    |   33 +-
 87 files changed, 16482 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
index 4694e1e..9d5d31a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
@@ -19,14 +19,18 @@ package org.apache.accumulo.core.client.admin;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.summary.SummarizerConfigurationUtil;
 
 /**
  * This object stores table creation parameters. Currently includes: {@link TimeType}, whether to include default iterators, and user-specified initial
@@ -41,8 +45,13 @@ public class NewTableConfiguration {
 
   private boolean limitVersion = true;
 
-  private Map<String,String> properties = new HashMap<>();
-  private SamplerConfiguration samplerConfiguration;
+  private Map<String,String> properties = Collections.emptyMap();
+  private Map<String,String> samplerProps = Collections.emptyMap();
+  private Map<String,String> summarizerProps = Collections.emptyMap();
+
+  private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps, String kind) {
+    checkArgument(Collections.disjoint(props.keySet(), derivedProps.keySet()), "Properties and derived %s properties are not disjoint", kind);
+  }
 
   /**
    * Configure logical or millisecond time for tables created with this configuration.
@@ -82,15 +91,15 @@ public class NewTableConfiguration {
    * Sets additional properties to be applied to tables created with this configuration. Additional calls to this method replaces properties set by previous
    * calls.
    *
-   * @param prop
+   * @param props
    *          additional properties to add to the table when it is created
    * @return this
    */
-  public NewTableConfiguration setProperties(Map<String,String> prop) {
-    checkArgument(prop != null, "properties is null");
-    SamplerConfigurationImpl.checkDisjoint(prop, samplerConfiguration);
-
-    this.properties = new HashMap<>(prop);
+  public NewTableConfiguration setProperties(Map<String,String> props) {
+    checkArgument(props != null, "properties is null");
+    checkDisjoint(props, samplerProps, "sampler");
+    checkDisjoint(props, summarizerProps, "summarizer");
+    this.properties = new HashMap<>(props);
     return this;
   }
 
@@ -106,10 +115,8 @@ public class NewTableConfiguration {
       propertyMap.putAll(IteratorUtil.generateInitialTableProperties(limitVersion));
     }
 
-    if (samplerConfiguration != null) {
-      propertyMap.putAll(new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap());
-    }
-
+    propertyMap.putAll(summarizerProps);
+    propertyMap.putAll(samplerProps);
     propertyMap.putAll(properties);
     return Collections.unmodifiableMap(propertyMap);
   }
@@ -121,8 +128,22 @@ public class NewTableConfiguration {
    */
   public NewTableConfiguration enableSampling(SamplerConfiguration samplerConfiguration) {
     requireNonNull(samplerConfiguration);
-    SamplerConfigurationImpl.checkDisjoint(properties, samplerConfiguration);
-    this.samplerConfiguration = samplerConfiguration;
+    Map<String,String> tmp = new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap();
+    checkDisjoint(properties, tmp, "sampler");
+    this.samplerProps = tmp;
+    return this;
+  }
+
+  /**
+   * Enables creating summary statistics using {@link Summarizer}'s for the new table.
+   *
+   * @since 2.0.0
+   */
+  public NewTableConfiguration enableSummarization(SummarizerConfiguration... configs) {
+    requireNonNull(configs);
+    Map<String,String> tmp = SummarizerConfigurationUtil.toTablePropertiesMap(Arrays.asList(configs));
+    checkDisjoint(properties, tmp, "summarizer");
+    summarizerProps = tmp;
     return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java
new file mode 100644
index 0000000..8dcf048
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.admin;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This interface allows configuring where and which summary data to retrieve before retrieving it.
+ *
+ * @since 2.0.0
+ */
+public interface SummaryRetriever {
+
+  /**
+   * Forces a flush of data in memory to files before summary data is retrieved. Data recently written to Accumulo may be in memory. Summary data is only
+   * retrieved from files. Therefore recently written data may not be represented in summaries, unless this options is set to true. This is optional and
+   * defaults to false.
+   *
+   * @return this
+   */
+  SummaryRetriever flush(boolean shouldFlush);
+
+  /**
+   * The start row is not inclusive. Calling this method is optional.
+   */
+  SummaryRetriever startRow(Text startRow);
+
+  /**
+   * The start row is not inclusive. Calling this method is optional.
+   */
+  SummaryRetriever startRow(CharSequence startRow);
+
+  /**
+   * The end row is inclusive. Calling this method is optional.
+   */
+  SummaryRetriever endRow(Text endRow);
+
+  /**
+   * The end row is inclusive. Calling this method is optional.
+   */
+  SummaryRetriever endRow(CharSequence endRow);
+
+  /**
+   * Filters which summary data is retrieved. By default all summary data present is retrieved. If only a subset of summary data is needed, then its best to be
+   * selective in order to avoid polluting summary data cache.
+   *
+   * <p>
+   * Each set of summary data is generated using a specific {@link SummarizerConfiguration}. The methods {@link #withConfiguration(Collection)} and
+   * {@link #withConfiguration(SummarizerConfiguration...)} allow selecting sets of summary data based on exact {@link SummarizerConfiguration} matches. This
+   * method enables less exact matching using regular expressions.
+   *
+   * <p>
+   * The regular expression passed to this method is used in the following way on the server side to match {@link SummarizerConfiguration} object. When a
+   * {@link SummarizerConfiguration} matches, the summary data generated using that configuration is returned.
+   *
+   * <pre>
+   * <code>
+   *    boolean doesConfigurationMatch(SummarizerConfiguration conf, String regex) {
+   *      // This is how conf is converted to a String in tablet servers for matching.
+   *      // The options are sorted to make writing regular expressions easier.
+   *      String confString = conf.getClassName()+" "+new TreeMap&lt;&gt;(conf.getOptions());
+   *      return Pattern.compile(regex).matcher(confString).matches();
+   *    }
+   * </code>
+   * </pre>
+   */
+  SummaryRetriever withMatchingConfiguration(String regex);
+
+  /**
+   * Allows specifying a set of summaries, generated using the specified configs, to retrieve. By default will retrieve all present.
+   *
+   * <p>
+   * Using this method to be more selective may pull less data in to the tablet servers summary cache.
+   */
+  SummaryRetriever withConfiguration(SummarizerConfiguration... config);
+
+  /**
+   * Allows specifying a set of summaries, generated using the specified configs, to retrieve. By default will retrieve all present.
+   *
+   * <p>
+   * Using this method to be more selective may pull less data in to the tablet servers summary cache.
+   */
+  SummaryRetriever withConfiguration(Collection<SummarizerConfiguration> configs);
+
+  /**
+   * @return a map of counter groups to counts
+   */
+  List<Summary> retrieve() throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index cabcfa3..f88d28e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.function.Predicate;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -34,9 +35,12 @@ import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -808,4 +812,65 @@ public interface TableOperations {
    * @since 1.8.0
    */
   SamplerConfiguration getSamplerConfiguration(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
+
+  /**
+   * Entry point for retrieving summaries with optional restrictions.
+   *
+   * <p>
+   * In order to retrieve Summaries, the Accumulo user making the request will need the {@link TablePermission#GET_SUMMARIES} table permission.
+   *
+   * <p>
+   * Accumulo stores summary data with each file in each tablet. In order to make retrieving it faster there is a per tablet server cache of summary data. When
+   * summary data for a file is not present, it will be retrieved using threads on the tserver. The tablet server properties
+   * {@code tserver.summary.partition.threads}, {@code tserver.summary.remote.threads}, {@code tserver.summary.retrieval.threads}, and
+   * {@code tserver.cache.summary.size} impact the performance of retrieving summaries.
+   *
+   * <p>
+   * Since summary data is cached, its important to use the summary selection options to only read the needed data into the cache.
+   *
+   * <p>
+   * Summary data will be merged on the tablet servers and then in this client process. Therefore it's important that the required summarizers are on the
+   * clients classpath.
+   *
+   * @since 2.0.0
+   * @see Summarizer
+   */
+  SummaryRetriever summaries(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
+
+  /**
+   * Enables summary generation for this table for future compactions.
+   *
+   * @param tableName
+   *          add summarizers to this table
+   * @param summarizers
+   *          summarizers to add
+   * @throws IllegalArgumentException
+   *           When new summarizers have the same property id as each other, or when the same summarizers previously added.
+   * @since 2.0.0
+   * @see SummarizerConfiguration#toTableProperties()
+   * @see SummarizerConfiguration#toTableProperties(SummarizerConfiguration...)
+   * @see SummarizerConfiguration#toTableProperties(Collection)
+   */
+  void addSummarizers(String tableName, SummarizerConfiguration... summarizers) throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
+
+  /**
+   * Removes summary generation for this table for the matching summarizers.
+   *
+   * @param tableName
+   *          remove summarizers from this table
+   * @param predicate
+   *          removes all summarizers whose configuration that matches this predicate
+   * @since 2.0.0
+   */
+  void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException,
+      AccumuloSecurityException;
+
+  /**
+   * @param tableName
+   *          list summarizers for this table
+   * @return the summarizers currently configured for the table
+   * @since 2.0.0
+   * @see SummarizerConfiguration#fromTableProperties(Map)
+   */
+  List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException;
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
index a4853f0..9d18f99 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
@@ -36,6 +36,9 @@ import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.TServiceClientFactory;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -47,8 +50,13 @@ public class ServerClient {
   private static final Logger log = LoggerFactory.getLogger(ServerClient.class);
 
   public static <T> T execute(ClientContext context, ClientExecReturn<T,ClientService.Client> exec) throws AccumuloException, AccumuloSecurityException {
+    return execute(context, new ClientService.Client.Factory(), exec);
+  }
+
+  public static <CT extends TServiceClient,RT> RT execute(ClientContext context, TServiceClientFactory<CT> factory, ClientExecReturn<RT,CT> exec)
+      throws AccumuloException, AccumuloSecurityException {
     try {
-      return executeRaw(context, exec);
+      return executeRaw(context, factory, exec);
     } catch (ThriftSecurityException e) {
       throw new AccumuloSecurityException(e.user, e.code, e);
     } catch (AccumuloException e) {
@@ -71,14 +79,21 @@ public class ServerClient {
   }
 
   public static <T> T executeRaw(ClientContext context, ClientExecReturn<T,ClientService.Client> exec) throws Exception {
+    return executeRaw(context, new ClientService.Client.Factory(), exec);
+  }
+
+  public static <CT extends TServiceClient,RT> RT executeRaw(ClientContext context, TServiceClientFactory<CT> factory, ClientExecReturn<RT,CT> exec)
+      throws Exception {
     while (true) {
-      ClientService.Client client = null;
+      CT client = null;
       String server = null;
       try {
-        Pair<String,Client> pair = ServerClient.getConnection(context);
+        Pair<String,CT> pair = ServerClient.getConnection(context, factory);
         server = pair.getFirst();
         client = pair.getSecond();
         return exec.execute(client);
+      } catch (TApplicationException tae) {
+        throw new AccumuloServerException(server, tae);
       } catch (TTransportException tte) {
         log.debug("ClientService request failed " + server + ", retrying ... ", tte);
         sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -99,6 +114,8 @@ public class ServerClient {
         client = pair.getSecond();
         exec.execute(client);
         break;
+      } catch (TApplicationException tae) {
+        throw new AccumuloServerException(server, tae);
       } catch (TTransportException tte) {
         log.debug("ClientService request failed " + server + ", retrying ... ", tte);
         sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -115,12 +132,21 @@ public class ServerClient {
     return getConnection(context, true);
   }
 
+  public static <CT extends TServiceClient> Pair<String,CT> getConnection(ClientContext context, TServiceClientFactory<CT> factory) throws TTransportException {
+    return getConnection(context, factory, true, context.getClientTimeoutInMillis());
+  }
+
   public static Pair<String,ClientService.Client> getConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException {
     return getConnection(context, preferCachedConnections, context.getClientTimeoutInMillis());
   }
 
   public static Pair<String,ClientService.Client> getConnection(ClientContext context, boolean preferCachedConnections, long rpcTimeout)
       throws TTransportException {
+    return getConnection(context, new ClientService.Client.Factory(), preferCachedConnections, rpcTimeout);
+  }
+
+  public static <CT extends TServiceClient> Pair<String,CT> getConnection(ClientContext context, TServiceClientFactory<CT> factory,
+      boolean preferCachedConnections, long rpcTimeout) throws TTransportException {
     checkArgument(context != null, "context is null");
     // create list of servers
     ArrayList<ThriftTransportKey> servers = new ArrayList<>();
@@ -141,7 +167,7 @@ public class ServerClient {
     boolean opened = false;
     try {
       Pair<String,TTransport> pair = ThriftTransportPool.getInstance().getAnyTransport(servers, preferCachedConnections);
-      ClientService.Client client = ThriftUtil.createClient(new ClientService.Client.Factory(), pair.getSecond());
+      CT client = ThriftUtil.createClient(factory, pair.getSecond());
       opened = true;
       warnedAboutTServersBeingDown = false;
       return new Pair<>(pair.getFirst(), client);
@@ -159,7 +185,7 @@ public class ServerClient {
     }
   }
 
-  public static void close(ClientService.Client client) {
+  public static void close(TServiceClient client) {
     if (client != null && client.getInputProtocol() != null && client.getInputProtocol().getTransport() != null) {
       ThriftTransportPool.getInstance().returnTransport(client.getInputProtocol().getTransport());
     } else {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
index 1c04a43..34b76fc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
@@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toSet;
 
 import java.io.BufferedReader;
 import java.io.FileNotFoundException;
@@ -38,6 +39,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
@@ -48,6 +50,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
@@ -69,16 +74,19 @@ import org.apache.accumulo.core.client.admin.DiskUsage;
 import org.apache.accumulo.core.client.admin.FindMax;
 import org.apache.accumulo.core.client.admin.Locations;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.SummaryRetriever;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
 import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 import org.apache.accumulo.core.client.impl.thrift.TDiskUsage;
 import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
@@ -90,6 +98,10 @@ import org.apache.accumulo.core.data.TabletId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.impl.TabletIdImpl;
+import org.apache.accumulo.core.data.thrift.TRowRange;
+import org.apache.accumulo.core.data.thrift.TSummaries;
+import org.apache.accumulo.core.data.thrift.TSummarizerConfiguration;
+import org.apache.accumulo.core.data.thrift.TSummaryRequest;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -103,6 +115,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.summary.SummarizerConfigurationUtil;
+import org.apache.accumulo.core.summary.SummaryCollection;
 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.Tracer;
@@ -126,6 +140,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 
 public class TableOperationsImpl extends TableOperationsHelper {
@@ -1661,4 +1676,138 @@ public class TableOperationsImpl extends TableOperationsHelper {
 
     return new LoctionsImpl(binnedRanges);
   }
+
+  @Override
+  public SummaryRetriever summaries(String tableName) {
+
+    return new SummaryRetriever() {
+
+      private Text startRow = null;
+      private Text endRow = null;
+      private List<TSummarizerConfiguration> summariesToFetch = Collections.emptyList();
+      private String summarizerClassRegex;
+      private boolean flush = false;
+
+      @Override
+      public SummaryRetriever startRow(Text startRow) {
+        Objects.requireNonNull(startRow);
+        if (endRow != null) {
+          Preconditions.checkArgument(startRow.compareTo(endRow) < 0, "Start row must be less than end row : %s >= %s", startRow, endRow);
+        }
+        this.startRow = startRow;
+        return this;
+      }
+
+      @Override
+      public SummaryRetriever startRow(CharSequence startRow) {
+        return startRow(new Text(startRow.toString()));
+      }
+
+      @Override
+      public List<Summary> retrieve() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+        String tableId = Tables.getTableId(context.getInstance(), tableName);
+        if (Tables.getTableState(context.getInstance(), tableId) == TableState.OFFLINE)
+          throw new TableOfflineException(context.getInstance(), tableId);
+
+        TRowRange range = new TRowRange(TextUtil.getByteBuffer(startRow), TextUtil.getByteBuffer(endRow));
+        TSummaryRequest request = new TSummaryRequest(tableId, range, summariesToFetch, summarizerClassRegex);
+        if (flush) {
+          _flush(tableId, startRow, endRow, true);
+        }
+
+        TSummaries ret = ServerClient.execute(context, new TabletClientService.Client.Factory(), client -> {
+          TSummaries tsr = client.startGetSummaries(Tracer.traceInfo(), context.rpcCreds(), request);
+          while (!tsr.finished) {
+            tsr = client.contiuneGetSummaries(Tracer.traceInfo(), tsr.sessionId);
+          }
+          return tsr;
+        });
+        return new SummaryCollection(ret).getSummaries();
+      }
+
+      @Override
+      public SummaryRetriever endRow(Text endRow) {
+        Objects.requireNonNull(endRow);
+        if (startRow != null) {
+          Preconditions.checkArgument(startRow.compareTo(endRow) < 0, "Start row must be less than end row : %s >= %s", startRow, endRow);
+        }
+        this.endRow = endRow;
+        return this;
+      }
+
+      @Override
+      public SummaryRetriever endRow(CharSequence endRow) {
+        return endRow(new Text(endRow.toString()));
+      }
+
+      @Override
+      public SummaryRetriever withConfiguration(Collection<SummarizerConfiguration> configs) {
+        Objects.requireNonNull(configs);
+        summariesToFetch = configs.stream().map(SummarizerConfigurationUtil::toThrift).collect(Collectors.toList());
+        return this;
+      }
+
+      @Override
+      public SummaryRetriever withConfiguration(SummarizerConfiguration... config) {
+        Objects.requireNonNull(config);
+        return withConfiguration(Arrays.asList(config));
+      }
+
+      @Override
+      public SummaryRetriever withMatchingConfiguration(String regex) {
+        Objects.requireNonNull(regex);
+        // Do a sanity check here to make sure that regex compiles, instead of having it fail on a tserver.
+        Pattern.compile(regex);
+        this.summarizerClassRegex = regex;
+        return this;
+      }
+
+      @Override
+      public SummaryRetriever flush(boolean b) {
+        this.flush = b;
+        return this;
+      }
+    };
+  }
+
+  @Override
+  public void addSummarizers(String tableName, SummarizerConfiguration... newConfigs) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    HashSet<SummarizerConfiguration> currentConfigs = new HashSet<>(SummarizerConfiguration.fromTableProperties(getProperties(tableName)));
+    HashSet<SummarizerConfiguration> newConfigSet = new HashSet<>(Arrays.asList(newConfigs));
+
+    newConfigSet.removeIf(sc -> currentConfigs.contains(sc));
+
+    Set<String> newIds = newConfigSet.stream().map(sc -> sc.getPropertyId()).collect(toSet());
+
+    for (SummarizerConfiguration csc : currentConfigs) {
+      if (newIds.contains(csc.getPropertyId())) {
+        throw new IllegalArgumentException("Summarizer property id is in use by " + csc);
+      }
+    }
+
+    Set<Entry<String,String>> es = SummarizerConfiguration.toTableProperties(newConfigSet).entrySet();
+    for (Entry<String,String> entry : es) {
+      setProperty(tableName, entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException,
+      AccumuloSecurityException {
+    Collection<SummarizerConfiguration> summarizerConfigs = SummarizerConfiguration.fromTableProperties(getProperties(tableName));
+    for (SummarizerConfiguration sc : summarizerConfigs) {
+      if (predicate.test(sc)) {
+        Set<String> ks = sc.toTableProperties().keySet();
+        for (String key : ks) {
+          removeProperty(tableName, key);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
+    return new ArrayList<>(SummarizerConfiguration.fromTableProperties(getProperties(tableName)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
index 640a85d..d7d2b2d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
@@ -23,6 +23,8 @@ import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -136,6 +138,20 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
     FileOutputConfigurator.setSampler(CLASS, job, samplerConfig);
   }
 
+  /**
+   * Specifies a list of summarizer configurations to create summary data in the output file. Each Key Value written will be passed to the configured
+   * {@link Summarizer}'s.
+   *
+   * @param job
+   *          The Hadoop job instance to be configured
+   * @param sumarizerConfigs
+   *          summarizer configurations
+   * @since 2.0.0
+   */
+  public static void setSummarizers(JobConf job, SummarizerConfiguration... sumarizerConfigs) {
+    FileOutputConfigurator.setSummarizers(CLASS, job, sumarizerConfigs);
+  }
+
   @Override
   public RecordWriter<Key,Value> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
     // get the path of the temporary output file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
index 656dba7..dcdd42b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
@@ -23,6 +23,8 @@ import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -134,6 +136,20 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
     FileOutputConfigurator.setSampler(CLASS, job.getConfiguration(), samplerConfig);
   }
 
+  /**
+   * Specifies a list of summarizer configurations to create summary data in the output file. Each Key Value written will be passed to the configured
+   * {@link Summarizer}'s.
+   *
+   * @param job
+   *          The Hadoop job instance to be configured
+   * @param sumarizerConfigs
+   *          summarizer configurations
+   * @since 2.0.0
+   */
+  public static void setSummarizers(Job job, SummarizerConfiguration... sumarizerConfigs) {
+    FileOutputConfigurator.setSummarizers(CLASS, job.getConfiguration(), sumarizerConfigs);
+  }
+
   @Override
   public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException {
     // get the path of the temporary output file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
index 049395f..5f73e90 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
@@ -22,6 +22,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
@@ -209,4 +210,12 @@ public class FileOutputConfigurator extends ConfiguratorBase {
     }
   }
 
+  public static void setSummarizers(Class<?> implementingClass, Configuration conf, SummarizerConfiguration[] sumarizerConfigs) {
+    Map<String,String> props = SummarizerConfiguration.toTableProperties(sumarizerConfigs);
+
+    for (Entry<String,String> entry : props.entrySet()) {
+      conf.set(enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + "." + entry.getKey(), entry.getValue());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
index de89137..de486d7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.function.Predicate;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -41,10 +42,12 @@ import org.apache.accumulo.core.client.admin.DiskUsage;
 import org.apache.accumulo.core.client.admin.FindMax;
 import org.apache.accumulo.core.client.admin.Locations;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.SummaryRetriever;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.client.impl.TableOperationsHelper;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -502,4 +505,28 @@ class MockTableOperations extends TableOperationsHelper {
   public Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public SummaryRetriever summaries(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void addSummarizers(String tableName, SummarizerConfiguration... summarizerConf) throws TableNotFoundException, AccumuloException,
+      AccumuloSecurityException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  @Override
+  public void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException,
+      AccumuloSecurityException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  @Override
+  public List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
index bc5995e..7c3f70e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
@@ -19,12 +19,18 @@ package org.apache.accumulo.core.client.rfile;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.function.Predicate;
 
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.client.summary.Summary.FileStatistics;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.security.Authorizations;
@@ -181,6 +187,119 @@ public class RFile {
   }
 
   /**
+   * This is an intermediate interface in a larger builder pattern. Supports setting the required input sources for reading summary data from an RFile.
+   *
+   * @since 2.0.0
+   */
+  public static interface SummaryInputArguments {
+    /**
+     * Specify RFiles to read from. When multiple inputs are specified the summary data will be merged.
+     *
+     * @param inputs
+     *          one or more RFiles to read.
+     * @return this
+     */
+    SummaryOptions from(RFileSource... inputs);
+
+    /**
+     * Specify RFiles to read from. When multiple are specified the summary data will be merged.
+     *
+     * @param files
+     *          one or more RFiles to read.
+     * @return this
+     */
+    SummaryFSOptions from(String... files);
+  }
+
+  /**
+   * This is an intermediate interface in a larger builder pattern. Enables optionally setting a FileSystem to read RFile summary data from.
+   *
+   * @since 2.0.0
+   */
+  public static interface SummaryFSOptions extends SummaryOptions {
+    /**
+     * Optionally provide a FileSystem to open RFiles. If not specified, the FileSystem will be constructed using configuration on the classpath.
+     *
+     * @param fs
+     *          use this FileSystem to open files.
+     * @return this
+     */
+    SummaryOptions withFileSystem(FileSystem fs);
+  }
+
+  /**
+   * This is an intermediate interface in a large builder pattern. Allows setting options for retrieving summary data.
+   *
+   * @since 2.0.0
+   */
+  public static interface SummaryOptions {
+    /**
+     * This method allows retrieving a subset of summary data from a file. If a file has lots of separate summaries, reading a subset may be faster.
+     *
+     * @param summarySelector
+     *          Only read summary data that was generated with configuration that this predicate matches.
+     * @return this
+     */
+    SummaryOptions selectSummaries(Predicate<SummarizerConfiguration> summarySelector);
+
+    /**
+     * Summary data may possibly be stored at a more granular level than the entire file. However there is no guarantee of this. If the data was stored at a
+     * more granular level, then this will get a subset of the summary data. The subset will very likely be an inaccurate approximation.
+     *
+     * @param startRow
+     *          A non-null start row. The startRow is used exclusively.
+     * @return this
+     *
+     * @see FileStatistics#getExtra()
+     */
+    SummaryOptions startRow(Text startRow);
+
+    /**
+     * @param startRow
+     *          UTF-8 encodes startRow. The startRow is used exclusively.
+     * @return this
+     * @see #startRow(Text)
+     */
+    SummaryOptions startRow(CharSequence startRow);
+
+    /**
+     * Summary data may possibly be stored at a more granular level than the entire file. However there is no guarantee of this. If the data was stored at a
+     * more granular level, then this will get a subset of the summary data. The subset will very likely be an inaccurate approximation.
+     *
+     * @param endRow
+     *          A non-null end row. The end row is used inclusively.
+     * @return this
+     *
+     * @see FileStatistics#getExtra()
+     */
+    SummaryOptions endRow(Text endRow);
+
+    /**
+     * @param endRow
+     *          UTF-8 encodes endRow. The end row is used inclusively.
+     * @return this
+     * @see #endRow(Text)
+     */
+    SummaryOptions endRow(CharSequence endRow);
+
+    /**
+     * Reads summary data from file.
+     *
+     * @return The summary data in the file that satisfied the selection criteria.
+     */
+    Collection<Summary> read() throws IOException;
+  }
+
+  /**
+   * Entry point for reading summary data from RFiles.
+   *
+   * @since 2.0.0
+   */
+  public static SummaryInputArguments summaries() {
+    return new RFileSummariesRetriever();
+  }
+
+  /**
    * This is an intermediate interface in a larger builder pattern. Supports setting the required output sink to write a RFile to.
    *
    * @since 1.8.0
@@ -224,6 +343,16 @@ public class RFile {
    * @since 1.8.0
    */
   public static interface WriterOptions {
+
+    /**
+     * Enable generating summary data in the created RFile by running {@link Summarizer}'s based on the specified configuration.
+     *
+     * @param summarizerConf
+     *          Configuration for summarizer to run.
+     * @since 2.0.0
+     */
+    public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf);
+
     /**
      * An option to store sample data in the generated RFile.
      *

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
index 3a55172..cfd331a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
@@ -112,12 +112,14 @@ class RFileScannerBuilder implements RFile.InputArguments, RFile.ScannerFSOption
 
   @Override
   public ScannerOptions from(RFileSource... inputs) {
+    Objects.requireNonNull(inputs);
     opts.in = new InputArgs(inputs);
     return this;
   }
 
   @Override
   public ScannerFSOptions from(String... files) {
+    Objects.requireNonNull(files);
     opts.in = new InputArgs(files);
     return this;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
new file mode 100644
index 0000000..367172a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.client.rfile.RFile.SummaryFSOptions;
+import org.apache.accumulo.core.client.rfile.RFile.SummaryInputArguments;
+import org.apache.accumulo.core.client.rfile.RFile.SummaryOptions;
+import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.summary.SummaryReader;
+import org.apache.accumulo.core.summary.Gatherer;
+import org.apache.accumulo.core.summary.SummarizerFactory;
+import org.apache.accumulo.core.summary.SummaryCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions, SummaryOptions {
+
+  private Predicate<SummarizerConfiguration> summarySelector = sc -> true;
+  private Text startRow;
+  private InputArgs in;
+  private Text endRow;
+
+  @Override
+  public SummaryOptions selectSummaries(Predicate<SummarizerConfiguration> summarySelector) {
+    Objects.requireNonNull(summarySelector);
+    this.summarySelector = summarySelector;
+    return this;
+  }
+
+  @Override
+  public SummaryOptions startRow(CharSequence startRow) {
+    return startRow(new Text(startRow.toString()));
+  }
+
+  @Override
+  public SummaryOptions startRow(Text startRow) {
+    Objects.requireNonNull(startRow);
+    this.startRow = startRow;
+    return this;
+  }
+
+  @Override
+  public SummaryOptions endRow(CharSequence endRow) {
+    return endRow(new Text(endRow.toString()));
+  }
+
+  @Override
+  public SummaryOptions endRow(Text endRow) {
+    Objects.requireNonNull(endRow);
+    this.endRow = endRow;
+    return this;
+  }
+
+  @Override
+  public Collection<Summary> read() throws IOException {
+    SummarizerFactory factory = new SummarizerFactory();
+    AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
+    Configuration conf = in.getFileSystem().getConf();
+
+    RFileSource[] sources = in.getSources();
+    try {
+      SummaryCollection all = new SummaryCollection();
+      for (RFileSource source : in.getSources()) {
+        SummaryReader fileSummary = SummaryReader.load(conf, acuconf, source.getInputStream(), source.getLength(), summarySelector, factory);
+        SummaryCollection sc = fileSummary.getSummaries(Collections.singletonList(new Gatherer.RowRange(startRow, endRow)));
+        all.merge(sc, factory);
+      }
+
+      return all.getSummaries();
+    } finally {
+      for (RFileSource source : sources) {
+        source.getInputStream().close();
+      }
+    }
+  }
+
+  @Override
+  public SummaryOptions withFileSystem(FileSystem fs) {
+    Objects.requireNonNull(fs);
+    this.in.fs = fs;
+    return this;
+  }
+
+  @Override
+  public SummaryOptions from(RFileSource... inputs) {
+    Objects.requireNonNull(inputs);
+    in = new InputArgs(inputs);
+    return this;
+  }
+
+  @Override
+  public SummaryFSOptions from(String... files) {
+    Objects.requireNonNull(files);
+    in = new InputArgs(files);
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
index 9995888..9ae7fb0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
@@ -210,6 +210,32 @@ public class RFileWriter implements AutoCloseable {
   }
 
   /**
+   * This method has the same behavior as {@link #append(Key, Value)}.
+   *
+   * @param key
+   *          Same restrictions on key as {@link #append(Key, Value)}.
+   * @param value
+   *          this parameter will be UTF-8 encoded. Must be non-null.
+   * @since 2.0.0
+   */
+  public void append(Key key, CharSequence value) throws IOException {
+    append(key, new Value(value));
+  }
+
+  /**
+   * This method has the same behavior as {@link #append(Key, Value)}.
+   *
+   * @param key
+   *          Same restrictions on key as {@link #append(Key, Value)}.
+   * @param value
+   *          Must be non-null.
+   * @since 2.0.0
+   */
+  public void append(Key key, byte[] value) throws IOException {
+    append(key, new Value(value));
+  }
+
+  /**
    * Append the keys and values to the last locality group that was started.
    *
    * @param keyValues

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
index 667cbef..a7decb1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
@@ -17,6 +17,8 @@
 
 package org.apache.accumulo.core.client.rfile;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collections;
@@ -28,6 +30,7 @@ import java.util.Objects;
 import org.apache.accumulo.core.client.rfile.RFile.WriterFSOptions;
 import org.apache.accumulo.core.client.rfile.RFile.WriterOptions;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.file.FileOperations;
@@ -59,15 +62,21 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
   }
 
   private OutputArgs out;
-  private SamplerConfiguration sampler = null;
   private Map<String,String> tableConfig = Collections.emptyMap();
   private int visCacheSize = 1000;
+  private Map<String,String> samplerProps = Collections.emptyMap();
+  private Map<String,String> summarizerProps = Collections.emptyMap();
+
+  private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps, String kind) {
+    checkArgument(Collections.disjoint(props.keySet(), derivedProps.keySet()), "Properties and derived %s properties are not disjoint", kind);
+  }
 
   @Override
   public WriterOptions withSampler(SamplerConfiguration samplerConf) {
     Objects.requireNonNull(samplerConf);
-    SamplerConfigurationImpl.checkDisjoint(tableConfig, samplerConf);
-    this.sampler = samplerConf;
+    Map<String,String> tmp = new SamplerConfigurationImpl(samplerConf).toTablePropertiesMap();
+    checkDisjoint(tableConfig, tmp, "sampler");
+    this.samplerProps = tmp;
     return this;
   }
 
@@ -76,10 +85,10 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
     FileOperations fileops = FileOperations.getInstance();
     AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
     HashMap<String,String> userProps = new HashMap<>();
-    if (sampler != null) {
-      userProps.putAll(new SamplerConfigurationImpl(sampler).toTablePropertiesMap());
-    }
+
     userProps.putAll(tableConfig);
+    userProps.putAll(summarizerProps);
+    userProps.putAll(samplerProps);
 
     if (userProps.size() > 0) {
       acuconf = new ConfigurationCopy(Iterables.concat(acuconf, userProps.entrySet()));
@@ -92,10 +101,11 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
       } else {
         fsdo = new FSDataOutputStream(out.getOutputStream(), new FileSystem.Statistics("foo"));
       }
-      return new RFileWriter(fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf()).withTableConfiguration(acuconf).build(), visCacheSize);
+      return new RFileWriter(fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf()).withTableConfiguration(acuconf)
+          .setAccumuloStartEnabled(false).build(), visCacheSize);
     } else {
       return new RFileWriter(fileops.newWriterBuilder().forFile(out.path.toString(), out.getFileSystem(), out.getConf()).withTableConfiguration(acuconf)
-          .build(), visCacheSize);
+          .setAccumuloStartEnabled(false).build(), visCacheSize);
     }
   }
 
@@ -128,7 +138,8 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
       cfg.put(entry.getKey(), entry.getValue());
     }
 
-    SamplerConfigurationImpl.checkDisjoint(cfg, sampler);
+    checkDisjoint(cfg, samplerProps, "sampler");
+    checkDisjoint(cfg, summarizerProps, "summarizer");
     this.tableConfig = cfg;
     return this;
   }
@@ -145,4 +156,13 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
     this.visCacheSize = maxSize;
     return this;
   }
+
+  @Override
+  public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) {
+    Objects.requireNonNull(summarizerConf);
+    Map<String,String> tmp = SummarizerConfiguration.toTableProperties(summarizerConf);
+    checkDisjoint(tableConfig, tmp, "summarizer");
+    this.summarizerProps = tmp;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java b/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java
new file mode 100644
index 0000000..a0f9bc5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.summary;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This a convenience class for interpreting summary data generated by implementations of {@link CountingSummarizer}
+ *
+ * @since 2.0.0
+ */
+
+public class CounterSummary {
+  private Map<String,Long> stats;
+
+  /**
+   * This method will call {@link #CounterSummary(Summary, boolean)} with true.
+   */
+  public CounterSummary(Summary summary) {
+    this(summary, true);
+  }
+
+  /**
+   * @param summary
+   *          a summary
+   * @param checkType
+   *          If true will try to ensure the classname from {@link Summary#getSummarizerConfiguration()} is an instance of {@link CountingSummarizer}. However
+   *          this check can only succeed if the class is on the classpath. For cases where the summary data needs to be used and the class is not on the
+   *          classpath, set this to false.
+   */
+  public CounterSummary(Summary summary, boolean checkType) {
+    if (checkType) {
+      String className = summary.getSummarizerConfiguration().getClassName();
+      try {
+        getClass().getClassLoader().loadClass(className).asSubclass(CountingSummarizer.class);
+      } catch (ClassCastException e) {
+        throw new IllegalArgumentException(className + " is not an instance of " + CountingSummarizer.class.getSimpleName(), e);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException("Unable to check summary was produced by a " + CountingSummarizer.class.getSimpleName(), e);
+      }
+    }
+    this.stats = summary.getStatistics();
+  }
+
+  @VisibleForTesting
+  CounterSummary(Map<String,Long> stats) {
+    this.stats = stats;
+  }
+
+  /**
+   * @return statistic for {@link CountingSummarizer#SEEN_STAT}
+   */
+  public long getSeen() {
+    return stats.getOrDefault(CountingSummarizer.SEEN_STAT, 0l);
+  }
+
+  /**
+   * @return statistic for {@link CountingSummarizer#EMITTED_STAT}
+   */
+  public long getEmitted() {
+    return stats.getOrDefault(CountingSummarizer.EMITTED_STAT, 0l);
+  }
+
+  /**
+   * @return the sum of {@link #getTooLong()} and {@link #getTooLong()}
+   */
+  public long getIgnored() {
+    return getTooLong() + getTooMany();
+  }
+
+  /**
+   * @return statistic for {@link CountingSummarizer#TOO_LONG_STAT}
+   */
+  public long getTooLong() {
+    return stats.getOrDefault(CountingSummarizer.TOO_LONG_STAT, 0l);
+  }
+
+  /**
+   * @return statistic for {@link CountingSummarizer#TOO_MANY_STAT}
+   */
+  public long getTooMany() {
+    return stats.getOrDefault(CountingSummarizer.TOO_MANY_STAT, 0l);
+  }
+
+  /**
+   * @return statistic for {@link CountingSummarizer#DELETES_IGNORED_STAT}
+   */
+  public long getDeletesIgnored() {
+    return stats.getOrDefault(CountingSummarizer.DELETES_IGNORED_STAT, 0l);
+  }
+
+  /**
+   * @return All statistics with a prefix of {@link CountingSummarizer#COUNTER_STAT_PREFIX} with the prefix stripped off.
+   */
+  public Map<String,Long> getCounters() {
+    HashMap<String,Long> ret = new HashMap<>();
+    for (Entry<String,Long> entry : stats.entrySet()) {
+      if (entry.getKey().startsWith(CountingSummarizer.COUNTER_STAT_PREFIX)) {
+        ret.put(entry.getKey().substring(CountingSummarizer.COUNTER_STAT_PREFIX.length()), entry.getValue());
+      }
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java b/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java
new file mode 100644
index 0000000..b3e1b68
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.summary;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.lang.mutable.MutableLong;
+
+//checkstyle and formatter are in conflict
+//@formatter:off
+/**
+ * This class counts arbitrary keys while defending against too many keys and keys that are too long.
+ *
+ * <p>
+ * During collection and summarization this class will use the functions from {@link #converter()} and {@link #encoder()}. For each key/value the function from
+ * {@link #converter()} will be called to create zero or more counter objects. A counter associated with each counter object will be incremented, as long as
+ * there are not too many counters and the counter object is not too long.
+ *
+ * <p>
+ * When {@link Summarizer.Collector#summarize(Summarizer.StatisticConsumer)} is called, the function from {@link #encoder()} will be used to convert counter
+ * objects to strings. These strings will be used to emit statistics. Overriding {@link #encoder()} is optional. One reason to override is if the counter object
+ * contains binary or special data. For example, a function that base64 encodes counter objects could be created.
+ *
+ * <p>
+ * If the counter key type is mutable, then consider overriding {@link #copier()}.
+ *
+ * <p>
+ * The function returned by {@link #converter()} will be called frequently and should be very efficient. The function returned by {@link #encoder()} will be
+ * called less frequently and can be more expensive. The reason these two functions exists is to avoid the conversion to string for each key value, if that
+ * conversion is unnecessary.
+ *
+ * <p>
+ * Below is an example implementation that counts column visibilities. This example avoids converting column visibility to string for each key/value. This
+ * example shows the source code for {@link VisibilitySummarizer}.
+ *
+ * <pre>
+ * <code>
+ *   public class VisibilitySummarizer extends CountingSummarizer&lt;ByteSequence&gt; {
+ *     &#064;Override
+ *     protected UnaryOperator&lt;ByteSequence&gt; copier() {
+ *       // ByteSequences are mutable, so override and provide a copy function
+ *       return ArrayByteSequence::new;
+ *     }
+ *
+ *     &#064;Override
+ *     protected Converter&lt;ByteSequence&gt; converter() {
+ *       return (key, val, consumer) -&gt; consumer.accept(key.getColumnVisibilityData());
+ *     }
+ *   }
+ * </code>
+ * </pre>
+ *
+ * @param <K>
+ *          The counter key type. This type must have good implementations of {@link Object#hashCode()} and {@link Object#equals(Object)}.
+ * @see CounterSummary
+ * @since 2.0.0
+ */
+//@formatter:on
+public abstract class CountingSummarizer<K> implements Summarizer {
+
+  /**
+   * A configuration option for specifying the maximum number of unique counters an instance of this summarizer should track. If not specified, a default of
+   * {@value #MAX_COUNTER_DEFAULT} will be used.
+   */
+  public static final String MAX_COUNTERS_OPT = "maxCounters";
+
+  /**
+   * A configuration option for specifying the maximum length of an individual counter key. If not specified, a default of {@value #MAX_CKL_DEFAULT} will be
+   * used.
+   */
+  public static final String MAX_COUNTER_LEN_OPT = "maxCounterLen";
+
+  /**
+   * A configuration option to determine if delete keys should be counted. If set to true then delete keys will not be passed to the {@link Converter} and the
+   * statistic {@value #DELETES_IGNORED_STAT} will track the number of deleted ignored. This options defaults to {@value #INGNORE_DELETES_DEFAULT}.
+   */
+  public static final String INGNORE_DELETES_OPT = "ignoreDeletes";
+
+  /**
+   * This prefixes all counters when emitting statistics in {@link Summarizer.Collector#summarize(Summarizer.StatisticConsumer)}.
+   */
+  public static final String COUNTER_STAT_PREFIX = "c:";
+
+  /**
+   * This is the name of the statistic that tracks how many counters objects were ignored because the number of unique counters was exceeded. The max number of
+   * unique counters is specified by {@link #MAX_COUNTERS_OPT}.
+   */
+  public static final String TOO_MANY_STAT = "tooMany";
+
+  /**
+   * This is the name of the statistic that tracks how many counter objects were ignored because they were too long. The maximum lenght is specified by
+   * {@link #MAX_COUNTER_LEN_OPT}.
+   */
+  public static final String TOO_LONG_STAT = "tooLong";
+
+  /**
+   * This is the name of the statistic that tracks the total number of counter objects emitted by the {@link Converter}. This includes emitted Counter objects
+   * that were ignored.
+   */
+  public static final String EMITTED_STAT = "emitted";
+
+  /**
+   * This is the name of the statistic that tracks the total number of deleted keys seen. This statistic is only incremented when the
+   * {@value #INGNORE_DELETES_OPT} option is set to true.
+   */
+  public static final String DELETES_IGNORED_STAT = "deletesIgnored";
+
+  /**
+   * This tracks the total number of key/values seen by the {@link Summarizer.Collector}
+   */
+  public static final String SEEN_STAT = "seen";
+
+  // this default can not be changed as persisted summary data depends on it. See the documentation about persistence in the Summarizer class javadoc.
+  public static final String MAX_COUNTER_DEFAULT = "1024";
+
+  // this default can not be changed as persisted summary data depends on it
+  public static final String MAX_CKL_DEFAULT = "128";
+
+  // this default can not be changed as persisted summary data depends on it
+  public static final String INGNORE_DELETES_DEFAULT = "true";
+
+  private static final String[] ALL_STATS = new String[] {TOO_LONG_STAT, TOO_MANY_STAT, EMITTED_STAT, SEEN_STAT, DELETES_IGNORED_STAT};
+
+  private int maxCounters;
+  private int maxCounterKeyLen;
+  private boolean ignoreDeletes;
+
+  private void init(SummarizerConfiguration conf) {
+    maxCounters = Integer.parseInt(conf.getOptions().getOrDefault(MAX_COUNTERS_OPT, MAX_COUNTER_DEFAULT));
+    maxCounterKeyLen = Integer.parseInt(conf.getOptions().getOrDefault(MAX_COUNTER_LEN_OPT, MAX_CKL_DEFAULT));
+    ignoreDeletes = Boolean.parseBoolean(conf.getOptions().getOrDefault(INGNORE_DELETES_OPT, INGNORE_DELETES_DEFAULT));
+  }
+
+  /**
+   * A function that converts key values to zero or more counter objects.
+   *
+   * @since 2.0.0
+   */
+  public static interface Converter<K> {
+    /**
+     * @param consumer
+     *          emit counter objects derived from key and value to this consumer
+     */
+    public void convert(Key k, Value v, Consumer<K> consumer);
+  }
+
+  /**
+   *
+   * @return A function that is used to convert each key value to zero or more counter objects. Each function returned should be independent.
+   */
+  protected abstract Converter<K> converter();
+
+  /**
+   * @return A function that is used to convert counter objects to String. The default function calls {@link Object#toString()} on the counter object.
+   */
+  protected Function<K,String> encoder() {
+    return Object::toString;
+  }
+
+  /**
+   * Override this if your key type is mutable and subject to change.
+   *
+   * @return a function that used to copy the counter object. This function is only used when the collector has never seen the counter object before. In this
+   *         case the collector needs to possibly copy the counter object before using as map key. The default implementation is the
+   *         {@link UnaryOperator#identity()} function.
+   */
+  protected UnaryOperator<K> copier() {
+    return UnaryOperator.identity();
+  }
+
+  @Override
+  public Collector collector(SummarizerConfiguration sc) {
+    init(sc);
+    return new Collector() {
+
+      // Map used for computing summary incrementally uses ByteSequence for key which is more efficient than converting String for each Key. The
+      // conversion to String is deferred until the summary is requested.
+
+      private Map<K,MutableLong> counters = new HashMap<>();
+      private long tooMany = 0;
+      private long tooLong = 0;
+      private long seen = 0;
+      private long emitted = 0;
+      private long deleted = 0;
+      private Converter<K> converter = converter();
+      private Function<K,String> encoder = encoder();
+      private UnaryOperator<K> copier = copier();
+
+      private void incrementCounter(K counter) {
+        emitted++;
+
+        MutableLong ml = counters.get(counter);
+        if (ml == null) {
+          if (counters.size() >= maxCounters) {
+            // no need to store this counter in the map and get() it... just use instance variable
+            tooMany++;
+          } else {
+            // we have never seen this key before, check if its too long
+            if (encoder.apply(counter).length() >= maxCounterKeyLen) {
+              tooLong++;
+            } else {
+              counters.put(copier.apply(counter), new MutableLong(1));
+            }
+          }
+        } else {
+          // using mutable long allows calling put() to be avoided
+          ml.increment();
+        }
+      }
+
+      @Override
+      public void accept(Key k, Value v) {
+        seen++;
+        if (ignoreDeletes && k.isDeleted()) {
+          deleted++;
+        } else {
+          converter.convert(k, v, this::incrementCounter);
+        }
+      }
+
+      @Override
+      public void summarize(StatisticConsumer sc) {
+        StringBuilder sb = new StringBuilder(COUNTER_STAT_PREFIX);
+
+        for (Entry<K,MutableLong> entry : counters.entrySet()) {
+          sb.setLength(COUNTER_STAT_PREFIX.length());
+          sb.append(encoder.apply(entry.getKey()));
+          sc.accept(sb.toString(), entry.getValue().longValue());
+        }
+
+        sc.accept(TOO_MANY_STAT, tooMany);
+        sc.accept(TOO_LONG_STAT, tooLong);
+        sc.accept(EMITTED_STAT, emitted);
+        sc.accept(SEEN_STAT, seen);
+        sc.accept(DELETES_IGNORED_STAT, deleted);
+      }
+    };
+  }
+
+  @Override
+  public Combiner combiner(SummarizerConfiguration sc) {
+    init(sc);
+    return new Combiner() {
+
+      @Override
+      public void merge(Map<String,Long> summary1, Map<String,Long> summary2) {
+
+        for (String key : ALL_STATS) {
+          summary1.merge(key, summary2.getOrDefault(key, 0l), Long::sum);
+        }
+
+        for (Entry<String,Long> entry : summary2.entrySet()) {
+          String k2 = entry.getKey();
+          Long v2 = entry.getValue();
+
+          if (k2.startsWith(COUNTER_STAT_PREFIX)) {
+            summary1.merge(k2, v2, Long::sum);
+          }
+        }
+
+        if (summary1.size() - ALL_STATS.length > maxCounters) {
+          // find the keys with the lowest counts to remove
+          List<String> keysToRemove = summary1.entrySet().stream().filter(e -> e.getKey().startsWith(COUNTER_STAT_PREFIX)) // filter out non counters
+              .sorted((e1, e2) -> Long.compare(e2.getValue(), e1.getValue())) // sort descending by count
+              .skip(maxCounters) // skip most frequent
+              .map(e -> e.getKey()).collect(Collectors.toList()); // collect the least frequent counters in a list
+
+          long removedCount = 0;
+          for (String key : keysToRemove) {
+            removedCount += summary1.remove(key);
+          }
+
+          summary1.merge(TOO_MANY_STAT, removedCount, Long::sum);
+        }
+      }
+    };
+  }
+}