You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/03/20 14:49:05 UTC
[9/9] accumulo git commit: ACCUMULO-4501 ACCUMULO-96 Added
Summarization
ACCUMULO-4501 ACCUMULO-96 Added Summarization
closes apache/accumulo#224
closes apache/accumulo#168
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/94cdcc4d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/94cdcc4d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/94cdcc4d
Branch: refs/heads/master
Commit: 94cdcc4d3f0a8ccf95894f206cb71e6117f4e51d
Parents: 68ba2ef
Author: Keith Turner <kt...@apache.org>
Authored: Mon Mar 20 10:47:00 2017 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Mar 20 10:47:00 2017 -0400
----------------------------------------------------------------------
.../client/admin/NewTableConfiguration.java | 49 +-
.../core/client/admin/SummaryRetriever.java | 112 +
.../core/client/admin/TableOperations.java | 65 +
.../accumulo/core/client/impl/ServerClient.java | 36 +-
.../core/client/impl/TableOperationsImpl.java | 151 +-
.../client/mapred/AccumuloFileOutputFormat.java | 16 +
.../mapreduce/AccumuloFileOutputFormat.java | 16 +
.../lib/impl/FileOutputConfigurator.java | 9 +
.../core/client/mock/MockTableOperations.java | 27 +
.../accumulo/core/client/rfile/RFile.java | 129 +
.../core/client/rfile/RFileScannerBuilder.java | 2 +
.../client/rfile/RFileSummariesRetriever.java | 122 +
.../accumulo/core/client/rfile/RFileWriter.java | 26 +
.../core/client/rfile/RFileWriterBuilder.java | 38 +-
.../core/client/summary/CounterSummary.java | 123 +
.../core/client/summary/CountingSummarizer.java | 302 +
.../core/client/summary/Summarizer.java | 227 +
.../client/summary/SummarizerConfiguration.java | 285 +
.../accumulo/core/client/summary/Summary.java | 145 +
.../summary/summarizers/DeletesSummarizer.java | 75 +
.../summary/summarizers/FamilySummarizer.java | 46 +
.../summarizers/VisibilitySummarizer.java | 47 +
.../core/compaction/CompactionSettings.java | 2 +
.../org/apache/accumulo/core/conf/Property.java | 22 +-
.../apache/accumulo/core/conf/PropertyType.java | 9 +-
.../accumulo/core/data/ArrayByteSequence.java | 18 +
.../accumulo/core/data/thrift/TRowRange.java | 521 ++
.../accumulo/core/data/thrift/TSummaries.java | 831 +++
.../data/thrift/TSummarizerConfiguration.java | 649 ++
.../accumulo/core/data/thrift/TSummary.java | 842 +++
.../core/data/thrift/TSummaryRequest.java | 760 +++
.../accumulo/core/file/BloomFilterLayer.java | 10 +-
.../core/file/DispatchingFileFactory.java | 7 +-
.../accumulo/core/file/FileOperations.java | 18 +
.../accumulo/core/file/rfile/PrintInfo.java | 7 +
.../core/file/rfile/RFileOperations.java | 2 +-
.../core/metadata/schema/MetadataScanner.java | 236 +
.../core/metadata/schema/TabletMetadata.java | 182 +
.../sample/impl/SamplerConfigurationImpl.java | 12 -
.../core/sample/impl/SamplerFactory.java | 8 +-
.../accumulo/core/security/TablePermission.java | 5 +-
.../apache/accumulo/core/summary/Gatherer.java | 631 ++
.../summary/SummarizerConfigurationUtil.java | 128 +
.../core/summary/SummarizerFactory.java | 63 +
.../core/summary/SummaryCollection.java | 188 +
.../accumulo/core/summary/SummaryInfo.java | 53 +
.../accumulo/core/summary/SummaryReader.java | 257 +
.../core/summary/SummarySerializer.java | 542 ++
.../accumulo/core/summary/SummaryWriter.java | 157 +
.../thrift/TabletClientService.java | 5642 +++++++++++++++++-
.../accumulo/core/util/CancelFlagFuture.java | 67 +
.../core/util/CompletableFutureUtil.java | 49 +
core/src/main/thrift/data.thrift | 34 +
core/src/main/thrift/tabletserver.thrift | 5 +
.../client/impl/TableOperationsHelperTest.java | 26 +
.../mapred/AccumuloFileOutputFormatTest.java | 18 +
.../mapreduce/AccumuloFileOutputFormatTest.java | 18 +
.../accumulo/core/client/rfile/RFileTest.java | 158 +-
.../client/summary/CountingSummarizerTest.java | 259 +
.../core/summary/SummaryCollectionTest.java | 72 +
.../core/util/CompletableFutureUtilTest.java | 53 +
.../main/asciidoc/accumulo_user_manual.asciidoc | 2 +
docs/src/main/asciidoc/chapters/summaries.txt | 232 +
.../standalone/StandaloneAccumuloCluster.java | 3 +-
.../standalone/StandaloneClusterControl.java | 3 +-
.../impl/MiniAccumuloConfigImpl.java | 3 +-
.../server/security/SecurityOperation.java | 5 +
.../apache/accumulo/tserver/TabletServer.java | 115 +
.../tserver/TabletServerResourceManager.java | 49 +-
.../tserver/compaction/CompactionStrategy.java | 1 -
.../compaction/MajorCompactionRequest.java | 84 +-
.../ConfigurableCompactionStrategy.java | 99 +-
.../TooManyDeletesCompactionStrategy.java | 173 +
.../tserver/session/SummarySession.java | 42 +
.../apache/accumulo/tserver/tablet/Tablet.java | 9 +-
.../DefaultCompactionStrategyTest.java | 2 +-
.../SizeLimitCompactionStrategyTest.java | 2 +-
.../TwoTierCompactionStrategyTest.java | 6 +-
.../ConfigurableCompactionStrategyTest.java | 2 +-
.../java/org/apache/accumulo/shell/Shell.java | 4 +-
.../accumulo/shell/commands/CompactCommand.java | 8 +-
.../shell/commands/SummariesCommand.java | 115 +
.../org/apache/accumulo/test/ShellServerIT.java | 129 +-
.../test/functional/BasicSummarizer.java | 80 +
.../accumulo/test/functional/SummaryIT.java | 820 +++
.../test/functional/TooManyDeletesIT.java | 121 +
.../test/performance/thrift/NullTserver.java | 33 +-
87 files changed, 16482 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
index 4694e1e..9d5d31a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
@@ -19,14 +19,18 @@ package org.apache.accumulo.core.client.admin;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.summary.SummarizerConfigurationUtil;
/**
* This object stores table creation parameters. Currently includes: {@link TimeType}, whether to include default iterators, and user-specified initial
@@ -41,8 +45,13 @@ public class NewTableConfiguration {
private boolean limitVersion = true;
- private Map<String,String> properties = new HashMap<>();
- private SamplerConfiguration samplerConfiguration;
+ private Map<String,String> properties = Collections.emptyMap();
+ private Map<String,String> samplerProps = Collections.emptyMap();
+ private Map<String,String> summarizerProps = Collections.emptyMap();
+
+ private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps, String kind) {
+ checkArgument(Collections.disjoint(props.keySet(), derivedProps.keySet()), "Properties and derived %s properties are not disjoint", kind);
+ }
/**
* Configure logical or millisecond time for tables created with this configuration.
@@ -82,15 +91,15 @@ public class NewTableConfiguration {
* Sets additional properties to be applied to tables created with this configuration. Additional calls to this method replaces properties set by previous
* calls.
*
- * @param prop
+ * @param props
* additional properties to add to the table when it is created
* @return this
*/
- public NewTableConfiguration setProperties(Map<String,String> prop) {
- checkArgument(prop != null, "properties is null");
- SamplerConfigurationImpl.checkDisjoint(prop, samplerConfiguration);
-
- this.properties = new HashMap<>(prop);
+ public NewTableConfiguration setProperties(Map<String,String> props) {
+ checkArgument(props != null, "properties is null");
+ checkDisjoint(props, samplerProps, "sampler");
+ checkDisjoint(props, summarizerProps, "summarizer");
+ this.properties = new HashMap<>(props);
return this;
}
@@ -106,10 +115,8 @@ public class NewTableConfiguration {
propertyMap.putAll(IteratorUtil.generateInitialTableProperties(limitVersion));
}
- if (samplerConfiguration != null) {
- propertyMap.putAll(new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap());
- }
-
+ propertyMap.putAll(summarizerProps);
+ propertyMap.putAll(samplerProps);
propertyMap.putAll(properties);
return Collections.unmodifiableMap(propertyMap);
}
@@ -121,8 +128,22 @@ public class NewTableConfiguration {
*/
public NewTableConfiguration enableSampling(SamplerConfiguration samplerConfiguration) {
requireNonNull(samplerConfiguration);
- SamplerConfigurationImpl.checkDisjoint(properties, samplerConfiguration);
- this.samplerConfiguration = samplerConfiguration;
+ Map<String,String> tmp = new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap();
+ checkDisjoint(properties, tmp, "sampler");
+ this.samplerProps = tmp;
+ return this;
+ }
+
+ /**
+ * Enables creating summary statistics using {@link Summarizer}'s for the new table.
+ *
+ * @since 2.0.0
+ */
+ public NewTableConfiguration enableSummarization(SummarizerConfiguration... configs) {
+ requireNonNull(configs);
+ Map<String,String> tmp = SummarizerConfigurationUtil.toTablePropertiesMap(Arrays.asList(configs));
+ checkDisjoint(properties, tmp, "summarizer");
+ summarizerProps = tmp;
return this;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java
new file mode 100644
index 0000000..8dcf048
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/SummaryRetriever.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.admin;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This interface allows configuring where and which summary data to retrieve before retrieving it.
+ *
+ * @since 2.0.0
+ */
+public interface SummaryRetriever {
+
+ /**
+ * Forces a flush of data in memory to files before summary data is retrieved. Data recently written to Accumulo may be in memory. Summary data is only
+ * retrieved from files. Therefore recently written data may not be represented in summaries, unless this options is set to true. This is optional and
+ * defaults to false.
+ *
+ * @return this
+ */
+ SummaryRetriever flush(boolean shouldFlush);
+
+ /**
+ * The start row is not inclusive. Calling this method is optional.
+ */
+ SummaryRetriever startRow(Text startRow);
+
+ /**
+ * The start row is not inclusive. Calling this method is optional.
+ */
+ SummaryRetriever startRow(CharSequence startRow);
+
+ /**
+ * The end row is inclusive. Calling this method is optional.
+ */
+ SummaryRetriever endRow(Text endRow);
+
+ /**
+ * The end row is inclusive. Calling this method is optional.
+ */
+ SummaryRetriever endRow(CharSequence endRow);
+
+ /**
+ * Filters which summary data is retrieved. By default all summary data present is retrieved. If only a subset of summary data is needed, then its best to be
+ * selective in order to avoid polluting summary data cache.
+ *
+ * <p>
+ * Each set of summary data is generated using a specific {@link SummarizerConfiguration}. The methods {@link #withConfiguration(Collection)} and
+ * {@link #withConfiguration(SummarizerConfiguration...)} allow selecting sets of summary data based on exact {@link SummarizerConfiguration} matches. This
+ * method enables less exact matching using regular expressions.
+ *
+ * <p>
+ * The regular expression passed to this method is used in the following way on the server side to match {@link SummarizerConfiguration} object. When a
+ * {@link SummarizerConfiguration} matches, the summary data generated using that configuration is returned.
+ *
+ * <pre>
+ * <code>
+ * boolean doesConfigurationMatch(SummarizerConfiguration conf, String regex) {
+ * // This is how conf is converted to a String in tablet servers for matching.
+ * // The options are sorted to make writing regular expressions easier.
+ * String confString = conf.getClassName()+" "+new TreeMap<>(conf.getOptions());
+ * return Pattern.compile(regex).matcher(confString).matches();
+ * }
+ * </code>
+ * </pre>
+ */
+ SummaryRetriever withMatchingConfiguration(String regex);
+
+ /**
+ * Allows specifying a set of summaries, generated using the specified configs, to retrieve. By default will retrieve all present.
+ *
+ * <p>
+ * Using this method to be more selective may pull less data in to the tablet servers summary cache.
+ */
+ SummaryRetriever withConfiguration(SummarizerConfiguration... config);
+
+ /**
+ * Allows specifying a set of summaries, generated using the specified configs, to retrieve. By default will retrieve all present.
+ *
+ * <p>
+ * Using this method to be more selective may pull less data in to the tablet servers summary cache.
+ */
+ SummaryRetriever withConfiguration(Collection<SummarizerConfiguration> configs);
+
+ /**
+ * @return a map of counter groups to counts
+ */
+ List<Summary> retrieve() throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index cabcfa3..f88d28e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
+import java.util.function.Predicate;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -34,9 +35,12 @@ import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
import org.apache.hadoop.io.Text;
/**
@@ -808,4 +812,65 @@ public interface TableOperations {
* @since 1.8.0
*/
SamplerConfiguration getSamplerConfiguration(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
+
+ /**
+ * Entry point for retrieving summaries with optional restrictions.
+ *
+ * <p>
+ * In order to retrieve Summaries, the Accumulo user making the request will need the {@link TablePermission#GET_SUMMARIES} table permission.
+ *
+ * <p>
+ * Accumulo stores summary data with each file in each tablet. In order to make retrieving it faster there is a per tablet server cache of summary data. When
+ * summary data for a file is not present, it will be retrieved using threads on the tserver. The tablet server properties
+ * {@code tserver.summary.partition.threads}, {@code tserver.summary.remote.threads}, {@code tserver.summary.retrieval.threads}, and
+ * {@code tserver.cache.summary.size} impact the performance of retrieving summaries.
+ *
+ * <p>
+ * Since summary data is cached, its important to use the summary selection options to only read the needed data into the cache.
+ *
+ * <p>
+ * Summary data will be merged on the tablet servers and then in this client process. Therefore it's important that the required summarizers are on the
+ * clients classpath.
+ *
+ * @since 2.0.0
+ * @see Summarizer
+ */
+ SummaryRetriever summaries(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
+
+ /**
+ * Enables summary generation for this table for future compactions.
+ *
+ * @param tableName
+ * add summarizers to this table
+ * @param summarizers
+ * summarizers to add
+ * @throws IllegalArgumentException
+ * When new summarizers have the same property id as each other, or when the same summarizers previously added.
+ * @since 2.0.0
+ * @see SummarizerConfiguration#toTableProperties()
+ * @see SummarizerConfiguration#toTableProperties(SummarizerConfiguration...)
+ * @see SummarizerConfiguration#toTableProperties(Collection)
+ */
+ void addSummarizers(String tableName, SummarizerConfiguration... summarizers) throws TableNotFoundException, AccumuloException, AccumuloSecurityException;
+
+ /**
+ * Removes summary generation for this table for the matching summarizers.
+ *
+ * @param tableName
+ * remove summarizers from this table
+ * @param predicate
+ * removes all summarizers whose configuration that matches this predicate
+ * @since 2.0.0
+ */
+ void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException,
+ AccumuloSecurityException;
+
+ /**
+ * @param tableName
+ * list summarizers for this table
+ * @return the summarizers currently configured for the table
+ * @since 2.0.0
+ * @see SummarizerConfiguration#fromTableProperties(Map)
+ */
+ List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
index a4853f0..9d18f99 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
@@ -36,6 +36,9 @@ import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.TServiceClientFactory;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -47,8 +50,13 @@ public class ServerClient {
private static final Logger log = LoggerFactory.getLogger(ServerClient.class);
public static <T> T execute(ClientContext context, ClientExecReturn<T,ClientService.Client> exec) throws AccumuloException, AccumuloSecurityException {
+ return execute(context, new ClientService.Client.Factory(), exec);
+ }
+
+ public static <CT extends TServiceClient,RT> RT execute(ClientContext context, TServiceClientFactory<CT> factory, ClientExecReturn<RT,CT> exec)
+ throws AccumuloException, AccumuloSecurityException {
try {
- return executeRaw(context, exec);
+ return executeRaw(context, factory, exec);
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (AccumuloException e) {
@@ -71,14 +79,21 @@ public class ServerClient {
}
public static <T> T executeRaw(ClientContext context, ClientExecReturn<T,ClientService.Client> exec) throws Exception {
+ return executeRaw(context, new ClientService.Client.Factory(), exec);
+ }
+
+ public static <CT extends TServiceClient,RT> RT executeRaw(ClientContext context, TServiceClientFactory<CT> factory, ClientExecReturn<RT,CT> exec)
+ throws Exception {
while (true) {
- ClientService.Client client = null;
+ CT client = null;
String server = null;
try {
- Pair<String,Client> pair = ServerClient.getConnection(context);
+ Pair<String,CT> pair = ServerClient.getConnection(context, factory);
server = pair.getFirst();
client = pair.getSecond();
return exec.execute(client);
+ } catch (TApplicationException tae) {
+ throw new AccumuloServerException(server, tae);
} catch (TTransportException tte) {
log.debug("ClientService request failed " + server + ", retrying ... ", tte);
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -99,6 +114,8 @@ public class ServerClient {
client = pair.getSecond();
exec.execute(client);
break;
+ } catch (TApplicationException tae) {
+ throw new AccumuloServerException(server, tae);
} catch (TTransportException tte) {
log.debug("ClientService request failed " + server + ", retrying ... ", tte);
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -115,12 +132,21 @@ public class ServerClient {
return getConnection(context, true);
}
+ public static <CT extends TServiceClient> Pair<String,CT> getConnection(ClientContext context, TServiceClientFactory<CT> factory) throws TTransportException {
+ return getConnection(context, factory, true, context.getClientTimeoutInMillis());
+ }
+
public static Pair<String,ClientService.Client> getConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException {
return getConnection(context, preferCachedConnections, context.getClientTimeoutInMillis());
}
public static Pair<String,ClientService.Client> getConnection(ClientContext context, boolean preferCachedConnections, long rpcTimeout)
throws TTransportException {
+ return getConnection(context, new ClientService.Client.Factory(), preferCachedConnections, rpcTimeout);
+ }
+
+ public static <CT extends TServiceClient> Pair<String,CT> getConnection(ClientContext context, TServiceClientFactory<CT> factory,
+ boolean preferCachedConnections, long rpcTimeout) throws TTransportException {
checkArgument(context != null, "context is null");
// create list of servers
ArrayList<ThriftTransportKey> servers = new ArrayList<>();
@@ -141,7 +167,7 @@ public class ServerClient {
boolean opened = false;
try {
Pair<String,TTransport> pair = ThriftTransportPool.getInstance().getAnyTransport(servers, preferCachedConnections);
- ClientService.Client client = ThriftUtil.createClient(new ClientService.Client.Factory(), pair.getSecond());
+ CT client = ThriftUtil.createClient(factory, pair.getSecond());
opened = true;
warnedAboutTServersBeingDown = false;
return new Pair<>(pair.getFirst(), client);
@@ -159,7 +185,7 @@ public class ServerClient {
}
}
- public static void close(ClientService.Client client) {
+ public static void close(TServiceClient client) {
if (client != null && client.getInputProtocol() != null && client.getInputProtocol().getTransport() != null) {
ThriftTransportPool.getInstance().returnTransport(client.getInputProtocol().getTransport());
} else {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
index 1c04a43..34b76fc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
@@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toSet;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
@@ -38,6 +39,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
@@ -48,6 +50,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@@ -69,16 +74,19 @@ import org.apache.accumulo.core.client.admin.DiskUsage;
import org.apache.accumulo.core.client.admin.FindMax;
import org.apache.accumulo.core.client.admin.Locations;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.SummaryRetriever;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
import org.apache.accumulo.core.client.impl.thrift.ClientService;
import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
-import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.client.impl.thrift.TDiskUsage;
import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
@@ -90,6 +98,10 @@ import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.data.impl.TabletIdImpl;
+import org.apache.accumulo.core.data.thrift.TRowRange;
+import org.apache.accumulo.core.data.thrift.TSummaries;
+import org.apache.accumulo.core.data.thrift.TSummarizerConfiguration;
+import org.apache.accumulo.core.data.thrift.TSummaryRequest;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -103,6 +115,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.summary.SummarizerConfigurationUtil;
+import org.apache.accumulo.core.summary.SummaryCollection;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Tracer;
@@ -126,6 +140,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
public class TableOperationsImpl extends TableOperationsHelper {
@@ -1661,4 +1676,138 @@ public class TableOperationsImpl extends TableOperationsHelper {
return new LoctionsImpl(binnedRanges);
}
+
+ @Override
+ public SummaryRetriever summaries(String tableName) {
+
+ return new SummaryRetriever() {
+
+ private Text startRow = null;
+ private Text endRow = null;
+ private List<TSummarizerConfiguration> summariesToFetch = Collections.emptyList();
+ private String summarizerClassRegex;
+ private boolean flush = false;
+
+ @Override
+ public SummaryRetriever startRow(Text startRow) {
+ Objects.requireNonNull(startRow);
+ if (endRow != null) {
+ Preconditions.checkArgument(startRow.compareTo(endRow) < 0, "Start row must be less than end row : %s >= %s", startRow, endRow);
+ }
+ this.startRow = startRow;
+ return this;
+ }
+
+ @Override
+ public SummaryRetriever startRow(CharSequence startRow) {
+ return startRow(new Text(startRow.toString()));
+ }
+
+ @Override
+ public List<Summary> retrieve() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ String tableId = Tables.getTableId(context.getInstance(), tableName);
+ if (Tables.getTableState(context.getInstance(), tableId) == TableState.OFFLINE)
+ throw new TableOfflineException(context.getInstance(), tableId);
+
+ TRowRange range = new TRowRange(TextUtil.getByteBuffer(startRow), TextUtil.getByteBuffer(endRow));
+ TSummaryRequest request = new TSummaryRequest(tableId, range, summariesToFetch, summarizerClassRegex);
+ if (flush) {
+ _flush(tableId, startRow, endRow, true);
+ }
+
+ TSummaries ret = ServerClient.execute(context, new TabletClientService.Client.Factory(), client -> {
+ TSummaries tsr = client.startGetSummaries(Tracer.traceInfo(), context.rpcCreds(), request);
+ while (!tsr.finished) {
+ tsr = client.contiuneGetSummaries(Tracer.traceInfo(), tsr.sessionId);
+ }
+ return tsr;
+ });
+ return new SummaryCollection(ret).getSummaries();
+ }
+
+ @Override
+ public SummaryRetriever endRow(Text endRow) {
+ Objects.requireNonNull(endRow);
+ if (startRow != null) {
+ Preconditions.checkArgument(startRow.compareTo(endRow) < 0, "Start row must be less than end row : %s >= %s", startRow, endRow);
+ }
+ this.endRow = endRow;
+ return this;
+ }
+
+ @Override
+ public SummaryRetriever endRow(CharSequence endRow) {
+ return endRow(new Text(endRow.toString()));
+ }
+
+ @Override
+ public SummaryRetriever withConfiguration(Collection<SummarizerConfiguration> configs) {
+ Objects.requireNonNull(configs);
+ summariesToFetch = configs.stream().map(SummarizerConfigurationUtil::toThrift).collect(Collectors.toList());
+ return this;
+ }
+
+ @Override
+ public SummaryRetriever withConfiguration(SummarizerConfiguration... config) {
+ Objects.requireNonNull(config);
+ return withConfiguration(Arrays.asList(config));
+ }
+
+ @Override
+ public SummaryRetriever withMatchingConfiguration(String regex) {
+ Objects.requireNonNull(regex);
+ // Do a sanity check here to make sure that regex compiles, instead of having it fail on a tserver.
+ Pattern.compile(regex);
+ this.summarizerClassRegex = regex;
+ return this;
+ }
+
+ @Override
+ public SummaryRetriever flush(boolean b) {
+ this.flush = b;
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void addSummarizers(String tableName, SummarizerConfiguration... newConfigs) throws AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ HashSet<SummarizerConfiguration> currentConfigs = new HashSet<>(SummarizerConfiguration.fromTableProperties(getProperties(tableName)));
+ HashSet<SummarizerConfiguration> newConfigSet = new HashSet<>(Arrays.asList(newConfigs));
+
+ newConfigSet.removeIf(sc -> currentConfigs.contains(sc));
+
+ Set<String> newIds = newConfigSet.stream().map(sc -> sc.getPropertyId()).collect(toSet());
+
+ for (SummarizerConfiguration csc : currentConfigs) {
+ if (newIds.contains(csc.getPropertyId())) {
+ throw new IllegalArgumentException("Summarizer property id is in use by " + csc);
+ }
+ }
+
+ Set<Entry<String,String>> es = SummarizerConfiguration.toTableProperties(newConfigSet).entrySet();
+ for (Entry<String,String> entry : es) {
+ setProperty(tableName, entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException,
+ AccumuloSecurityException {
+ Collection<SummarizerConfiguration> summarizerConfigs = SummarizerConfiguration.fromTableProperties(getProperties(tableName));
+ for (SummarizerConfiguration sc : summarizerConfigs) {
+ if (predicate.test(sc)) {
+ Set<String> ks = sc.toTableProperties().keySet();
+ for (String key : ks) {
+ removeProperty(tableName, key);
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
+ return new ArrayList<>(SummarizerConfiguration.fromTableProperties(getProperties(tableName)));
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
index 640a85d..d7d2b2d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
@@ -23,6 +23,8 @@ import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
@@ -136,6 +138,20 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
FileOutputConfigurator.setSampler(CLASS, job, samplerConfig);
}
+ /**
+ * Specifies a list of summarizer configurations to create summary data in the output file. Each Key Value written will be passed to the configured
+ * {@link Summarizer}'s.
+ *
+ * @param job
+ * The Hadoop job instance to be configured
+ * @param sumarizerConfigs
+ * summarizer configurations
+ * @since 2.0.0
+ */
+ public static void setSummarizers(JobConf job, SummarizerConfiguration... sumarizerConfigs) {
+ FileOutputConfigurator.setSummarizers(CLASS, job, sumarizerConfigs);
+ }
+
@Override
public RecordWriter<Key,Value> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
// get the path of the temporary output file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
index 656dba7..dcdd42b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
@@ -23,6 +23,8 @@ import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
@@ -134,6 +136,20 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
FileOutputConfigurator.setSampler(CLASS, job.getConfiguration(), samplerConfig);
}
+ /**
+ * Specifies a list of summarizer configurations to create summary data in the output file. Each Key Value written will be passed to the configured
+ * {@link Summarizer}'s.
+ *
+ * @param job
+ * The Hadoop job instance to be configured
+ * @param sumarizerConfigs
+ * summarizer configurations
+ * @since 2.0.0
+ */
+ public static void setSummarizers(Job job, SummarizerConfiguration... sumarizerConfigs) {
+ FileOutputConfigurator.setSummarizers(CLASS, job.getConfiguration(), sumarizerConfigs);
+ }
+
@Override
public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException {
// get the path of the temporary output file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
index 049395f..5f73e90 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
@@ -22,6 +22,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
@@ -209,4 +210,12 @@ public class FileOutputConfigurator extends ConfiguratorBase {
}
}
+ public static void setSummarizers(Class<?> implementingClass, Configuration conf, SummarizerConfiguration[] sumarizerConfigs) {
+ Map<String,String> props = SummarizerConfiguration.toTableProperties(sumarizerConfigs);
+
+ for (Entry<String,String> entry : props.entrySet()) {
+ conf.set(enumToConfKey(implementingClass, Opts.ACCUMULO_PROPERTIES) + "." + entry.getKey(), entry.getValue());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
index de89137..de486d7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.function.Predicate;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -41,10 +42,12 @@ import org.apache.accumulo.core.client.admin.DiskUsage;
import org.apache.accumulo.core.client.admin.FindMax;
import org.apache.accumulo.core.client.admin.Locations;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.SummaryRetriever;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.client.impl.TableOperationsHelper;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -502,4 +505,28 @@ class MockTableOperations extends TableOperationsHelper {
public Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public SummaryRetriever summaries(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addSummarizers(String tableName, SummarizerConfiguration... summarizerConf) throws TableNotFoundException, AccumuloException,
+ AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException,
+ AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
index bc5995e..7c3f70e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
@@ -19,12 +19,18 @@ package org.apache.accumulo.core.client.rfile;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.function.Predicate;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.client.summary.Summary.FileStatistics;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
@@ -181,6 +187,119 @@ public class RFile {
}
/**
+ * This is an intermediate interface in a larger builder pattern. Supports setting the required input sources for reading summary data from an RFile.
+ *
+ * @since 2.0.0
+ */
+ public static interface SummaryInputArguments {
+ /**
+ * Specify RFiles to read from. When multiple inputs are specified the summary data will be merged.
+ *
+ * @param inputs
+ * one or more RFiles to read.
+ * @return this
+ */
+ SummaryOptions from(RFileSource... inputs);
+
+ /**
+ * Specify RFiles to read from. When multiple are specified the summary data will be merged.
+ *
+ * @param files
+ * one or more RFiles to read.
+ * @return this
+ */
+ SummaryFSOptions from(String... files);
+ }
+
+ /**
+ * This is an intermediate interface in a larger builder pattern. Enables optionally setting a FileSystem to read RFile summary data from.
+ *
+ * @since 2.0.0
+ */
+ public static interface SummaryFSOptions extends SummaryOptions {
+ /**
+ * Optionally provide a FileSystem to open RFiles. If not specified, the FileSystem will be constructed using configuration on the classpath.
+ *
+ * @param fs
+ * use this FileSystem to open files.
+ * @return this
+ */
+ SummaryOptions withFileSystem(FileSystem fs);
+ }
+
+ /**
+ * This is an intermediate interface in a large builder pattern. Allows setting options for retrieving summary data.
+ *
+ * @since 2.0.0
+ */
+ public static interface SummaryOptions {
+ /**
+ * This method allows retrieving a subset of summary data from a file. If a file has lots of separate summaries, reading a subset may be faster.
+ *
+ * @param summarySelector
+ * Only read summary data that was generated with configuration that this predicate matches.
+ * @return this
+ */
+ SummaryOptions selectSummaries(Predicate<SummarizerConfiguration> summarySelector);
+
+ /**
+ * Summary data may possibly be stored at a more granular level than the entire file. However there is no guarantee of this. If the data was stored at a
+ * more granular level, then this will get a subset of the summary data. The subset will very likely be an inaccurate approximation.
+ *
+ * @param startRow
+ * A non-null start row. The startRow is used exclusively.
+ * @return this
+ *
+ * @see FileStatistics#getExtra()
+ */
+ SummaryOptions startRow(Text startRow);
+
+ /**
+ * @param startRow
+ * UTF-8 encodes startRow. The startRow is used exclusively.
+ * @return this
+ * @see #startRow(Text)
+ */
+ SummaryOptions startRow(CharSequence startRow);
+
+ /**
+ * Summary data may possibly be stored at a more granular level than the entire file. However there is no guarantee of this. If the data was stored at a
+ * more granular level, then this will get a subset of the summary data. The subset will very likely be an inaccurate approximation.
+ *
+ * @param endRow
+ * A non-null end row. The end row is used inclusively.
+ * @return this
+ *
+ * @see FileStatistics#getExtra()
+ */
+ SummaryOptions endRow(Text endRow);
+
+ /**
+ * @param endRow
+ * UTF-8 encodes endRow. The end row is used inclusively.
+ * @return this
+ * @see #endRow(Text)
+ */
+ SummaryOptions endRow(CharSequence endRow);
+
+ /**
+ * Reads summary data from file.
+ *
+ * @return The summary data in the file that satisfied the selection criteria.
+ */
+ Collection<Summary> read() throws IOException;
+ }
+
+ /**
+ * Entry point for reading summary data from RFiles.
+ *
+ * @since 2.0.0
+ */
+ public static SummaryInputArguments summaries() {
+ return new RFileSummariesRetriever();
+ }
+
+ /**
* This is an intermediate interface in a larger builder pattern. Supports setting the required output sink to write a RFile to.
*
* @since 1.8.0
@@ -224,6 +343,16 @@ public class RFile {
* @since 1.8.0
*/
public static interface WriterOptions {
+
+ /**
+ * Enable generating summary data in the created RFile by running {@link Summarizer}'s based on the specified configuration.
+ *
+ * @param summarizerConf
+ * Configuration for summarizer to run.
+ * @since 2.0.0
+ */
+ public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf);
+
/**
* An option to store sample data in the generated RFile.
*
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
index 3a55172..cfd331a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
@@ -112,12 +112,14 @@ class RFileScannerBuilder implements RFile.InputArguments, RFile.ScannerFSOption
@Override
public ScannerOptions from(RFileSource... inputs) {
+ Objects.requireNonNull(inputs);
opts.in = new InputArgs(inputs);
return this;
}
@Override
public ScannerFSOptions from(String... files) {
+ Objects.requireNonNull(files);
opts.in = new InputArgs(files);
return this;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
new file mode 100644
index 0000000..367172a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.client.rfile.RFile.SummaryFSOptions;
+import org.apache.accumulo.core.client.rfile.RFile.SummaryInputArguments;
+import org.apache.accumulo.core.client.rfile.RFile.SummaryOptions;
+import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.summary.SummaryReader;
+import org.apache.accumulo.core.summary.Gatherer;
+import org.apache.accumulo.core.summary.SummarizerFactory;
+import org.apache.accumulo.core.summary.SummaryCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions, SummaryOptions {
+
+ private Predicate<SummarizerConfiguration> summarySelector = sc -> true;
+ private Text startRow;
+ private InputArgs in;
+ private Text endRow;
+
+ @Override
+ public SummaryOptions selectSummaries(Predicate<SummarizerConfiguration> summarySelector) {
+ Objects.requireNonNull(summarySelector);
+ this.summarySelector = summarySelector;
+ return this;
+ }
+
+ @Override
+ public SummaryOptions startRow(CharSequence startRow) {
+ return startRow(new Text(startRow.toString()));
+ }
+
+ @Override
+ public SummaryOptions startRow(Text startRow) {
+ Objects.requireNonNull(startRow);
+ this.startRow = startRow;
+ return this;
+ }
+
+ @Override
+ public SummaryOptions endRow(CharSequence endRow) {
+ return endRow(new Text(endRow.toString()));
+ }
+
+ @Override
+ public SummaryOptions endRow(Text endRow) {
+ Objects.requireNonNull(endRow);
+ this.endRow = endRow;
+ return this;
+ }
+
+ @Override
+ public Collection<Summary> read() throws IOException {
+ SummarizerFactory factory = new SummarizerFactory();
+ AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
+ Configuration conf = in.getFileSystem().getConf();
+
+ RFileSource[] sources = in.getSources();
+ try {
+ SummaryCollection all = new SummaryCollection();
+ for (RFileSource source : in.getSources()) {
+ SummaryReader fileSummary = SummaryReader.load(conf, acuconf, source.getInputStream(), source.getLength(), summarySelector, factory);
+ SummaryCollection sc = fileSummary.getSummaries(Collections.singletonList(new Gatherer.RowRange(startRow, endRow)));
+ all.merge(sc, factory);
+ }
+
+ return all.getSummaries();
+ } finally {
+ for (RFileSource source : sources) {
+ source.getInputStream().close();
+ }
+ }
+ }
+
+ @Override
+ public SummaryOptions withFileSystem(FileSystem fs) {
+ Objects.requireNonNull(fs);
+ this.in.fs = fs;
+ return this;
+ }
+
+ @Override
+ public SummaryOptions from(RFileSource... inputs) {
+ Objects.requireNonNull(inputs);
+ in = new InputArgs(inputs);
+ return this;
+ }
+
+ @Override
+ public SummaryFSOptions from(String... files) {
+ Objects.requireNonNull(files);
+ in = new InputArgs(files);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
index 9995888..9ae7fb0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
@@ -210,6 +210,32 @@ public class RFileWriter implements AutoCloseable {
}
/**
+ * This method has the same behavior as {@link #append(Key, Value)}.
+ *
+ * @param key
+ * Same restrictions on key as {@link #append(Key, Value)}.
+ * @param value
+ * this parameter will be UTF-8 encoded. Must be non-null.
+ * @since 2.0.0
+ */
+ public void append(Key key, CharSequence value) throws IOException {
+ append(key, new Value(value));
+ }
+
+ /**
+ * This method has the same behavior as {@link #append(Key, Value)}.
+ *
+ * @param key
+ * Same restrictions on key as {@link #append(Key, Value)}.
+ * @param value
+ * Must be non-null.
+ * @since 2.0.0
+ */
+ public void append(Key key, byte[] value) throws IOException {
+ append(key, new Value(value));
+ }
+
+ /**
* Append the keys and values to the last locality group that was started.
*
* @param keyValues
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
index 667cbef..a7decb1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
@@ -17,6 +17,8 @@
package org.apache.accumulo.core.client.rfile;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
@@ -28,6 +30,7 @@ import java.util.Objects;
import org.apache.accumulo.core.client.rfile.RFile.WriterFSOptions;
import org.apache.accumulo.core.client.rfile.RFile.WriterOptions;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.file.FileOperations;
@@ -59,15 +62,21 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
}
private OutputArgs out;
- private SamplerConfiguration sampler = null;
private Map<String,String> tableConfig = Collections.emptyMap();
private int visCacheSize = 1000;
+ private Map<String,String> samplerProps = Collections.emptyMap();
+ private Map<String,String> summarizerProps = Collections.emptyMap();
+
+ private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps, String kind) {
+ checkArgument(Collections.disjoint(props.keySet(), derivedProps.keySet()), "Properties and derived %s properties are not disjoint", kind);
+ }
@Override
public WriterOptions withSampler(SamplerConfiguration samplerConf) {
Objects.requireNonNull(samplerConf);
- SamplerConfigurationImpl.checkDisjoint(tableConfig, samplerConf);
- this.sampler = samplerConf;
+ Map<String,String> tmp = new SamplerConfigurationImpl(samplerConf).toTablePropertiesMap();
+ checkDisjoint(tableConfig, tmp, "sampler");
+ this.samplerProps = tmp;
return this;
}
@@ -76,10 +85,10 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
FileOperations fileops = FileOperations.getInstance();
AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
HashMap<String,String> userProps = new HashMap<>();
- if (sampler != null) {
- userProps.putAll(new SamplerConfigurationImpl(sampler).toTablePropertiesMap());
- }
+
userProps.putAll(tableConfig);
+ userProps.putAll(summarizerProps);
+ userProps.putAll(samplerProps);
if (userProps.size() > 0) {
acuconf = new ConfigurationCopy(Iterables.concat(acuconf, userProps.entrySet()));
@@ -92,10 +101,11 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
} else {
fsdo = new FSDataOutputStream(out.getOutputStream(), new FileSystem.Statistics("foo"));
}
- return new RFileWriter(fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf()).withTableConfiguration(acuconf).build(), visCacheSize);
+ return new RFileWriter(fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf()).withTableConfiguration(acuconf)
+ .setAccumuloStartEnabled(false).build(), visCacheSize);
} else {
return new RFileWriter(fileops.newWriterBuilder().forFile(out.path.toString(), out.getFileSystem(), out.getConf()).withTableConfiguration(acuconf)
- .build(), visCacheSize);
+ .setAccumuloStartEnabled(false).build(), visCacheSize);
}
}
@@ -128,7 +138,8 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
cfg.put(entry.getKey(), entry.getValue());
}
- SamplerConfigurationImpl.checkDisjoint(cfg, sampler);
+ checkDisjoint(cfg, samplerProps, "sampler");
+ checkDisjoint(cfg, summarizerProps, "summarizer");
this.tableConfig = cfg;
return this;
}
@@ -145,4 +156,13 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
this.visCacheSize = maxSize;
return this;
}
+
+ @Override
+ public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) {
+ Objects.requireNonNull(summarizerConf);
+ Map<String,String> tmp = SummarizerConfiguration.toTableProperties(summarizerConf);
+ checkDisjoint(tableConfig, tmp, "summarizer");
+ this.summarizerProps = tmp;
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java b/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java
new file mode 100644
index 0000000..a0f9bc5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/summary/CounterSummary.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.summary;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This a convenience class for interpreting summary data generated by implementations of {@link CountingSummarizer}
+ *
+ * @since 2.0.0
+ */
+
+public class CounterSummary {
+ private Map<String,Long> stats;
+
+ /**
+ * This method will call {@link #CounterSummary(Summary, boolean)} with true.
+ */
+ public CounterSummary(Summary summary) {
+ this(summary, true);
+ }
+
+ /**
+ * @param summary
+ * a summary
+ * @param checkType
+ * If true will try to ensure the classname from {@link Summary#getSummarizerConfiguration()} is an instance of {@link CountingSummarizer}. However
+ * this check can only succeed if the class is on the classpath. For cases where the summary data needs to be used and the class is not on the
+ * classpath, set this to false.
+ */
+ public CounterSummary(Summary summary, boolean checkType) {
+ if (checkType) {
+ String className = summary.getSummarizerConfiguration().getClassName();
+ try {
+ getClass().getClassLoader().loadClass(className).asSubclass(CountingSummarizer.class);
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(className + " is not an instance of " + CountingSummarizer.class.getSimpleName(), e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Unable to check summary was produced by a " + CountingSummarizer.class.getSimpleName(), e);
+ }
+ }
+ this.stats = summary.getStatistics();
+ }
+
+ @VisibleForTesting
+ CounterSummary(Map<String,Long> stats) {
+ this.stats = stats;
+ }
+
+ /**
+ * @return statistic for {@link CountingSummarizer#SEEN_STAT}
+ */
+ public long getSeen() {
+ return stats.getOrDefault(CountingSummarizer.SEEN_STAT, 0l);
+ }
+
+ /**
+ * @return statistic for {@link CountingSummarizer#EMITTED_STAT}
+ */
+ public long getEmitted() {
+ return stats.getOrDefault(CountingSummarizer.EMITTED_STAT, 0l);
+ }
+
+ /**
+ * @return the sum of {@link #getTooLong()} and {@link #getTooLong()}
+ */
+ public long getIgnored() {
+ return getTooLong() + getTooMany();
+ }
+
+ /**
+ * @return statistic for {@link CountingSummarizer#TOO_LONG_STAT}
+ */
+ public long getTooLong() {
+ return stats.getOrDefault(CountingSummarizer.TOO_LONG_STAT, 0l);
+ }
+
+ /**
+ * @return statistic for {@link CountingSummarizer#TOO_MANY_STAT}
+ */
+ public long getTooMany() {
+ return stats.getOrDefault(CountingSummarizer.TOO_MANY_STAT, 0l);
+ }
+
+ /**
+ * @return statistic for {@link CountingSummarizer#DELETES_IGNORED_STAT}
+ */
+ public long getDeletesIgnored() {
+ return stats.getOrDefault(CountingSummarizer.DELETES_IGNORED_STAT, 0l);
+ }
+
+ /**
+ * @return All statistics with a prefix of {@link CountingSummarizer#COUNTER_STAT_PREFIX} with the prefix stripped off.
+ */
+ public Map<String,Long> getCounters() {
+ HashMap<String,Long> ret = new HashMap<>();
+ for (Entry<String,Long> entry : stats.entrySet()) {
+ if (entry.getKey().startsWith(CountingSummarizer.COUNTER_STAT_PREFIX)) {
+ ret.put(entry.getKey().substring(CountingSummarizer.COUNTER_STAT_PREFIX.length()), entry.getValue());
+ }
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java b/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java
new file mode 100644
index 0000000..b3e1b68
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/summary/CountingSummarizer.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.summary;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.lang.mutable.MutableLong;
+
+//checkstyle and formatter are in conflict
+//@formatter:off
+/**
+ * This class counts arbitrary keys while defending against too many keys and keys that are too long.
+ *
+ * <p>
+ * During collection and summarization this class will use the functions from {@link #converter()} and {@link #encoder()}. For each key/value the function from
+ * {@link #converter()} will be called to create zero or more counter objects. A counter associated with each counter object will be incremented, as long as
+ * there are not too many counters and the counter object is not too long.
+ *
+ * <p>
+ * When {@link Summarizer.Collector#summarize(Summarizer.StatisticConsumer)} is called, the function from {@link #encoder()} will be used to convert counter
+ * objects to strings. These strings will be used to emit statistics. Overriding {@link #encoder()} is optional. One reason to override is if the counter object
+ * contains binary or special data. For example, a function that base64 encodes counter objects could be created.
+ *
+ * <p>
+ * If the counter key type is mutable, then consider overriding {@link #copier()}.
+ *
+ * <p>
+ * The function returned by {@link #converter()} will be called frequently and should be very efficient. The function returned by {@link #encoder()} will be
+ * called less frequently and can be more expensive. The reason these two functions exists is to avoid the conversion to string for each key value, if that
+ * conversion is unnecessary.
+ *
+ * <p>
+ * Below is an example implementation that counts column visibilities. This example avoids converting column visibility to string for each key/value. This
+ * example shows the source code for {@link VisibilitySummarizer}.
+ *
+ * <pre>
+ * <code>
+ * public class VisibilitySummarizer extends CountingSummarizer<ByteSequence> {
+ * @Override
+ * protected UnaryOperator<ByteSequence> copier() {
+ * // ByteSequences are mutable, so override and provide a copy function
+ * return ArrayByteSequence::new;
+ * }
+ *
+ * @Override
+ * protected Converter<ByteSequence> converter() {
+ * return (key, val, consumer) -> consumer.accept(key.getColumnVisibilityData());
+ * }
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param <K>
+ * The counter key type. This type must have good implementations of {@link Object#hashCode()} and {@link Object#equals(Object)}.
+ * @see CounterSummary
+ * @since 2.0.0
+ */
+//@formatter:on
+public abstract class CountingSummarizer<K> implements Summarizer {
+
+ /**
+ * A configuration option for specifying the maximum number of unique counters an instance of this summarizer should track. If not specified, a default of
+ * {@value #MAX_COUNTER_DEFAULT} will be used.
+ */
+ public static final String MAX_COUNTERS_OPT = "maxCounters";
+
+ /**
+ * A configuration option for specifying the maximum length of an individual counter key. If not specified, a default of {@value #MAX_CKL_DEFAULT} will be
+ * used.
+ */
+ public static final String MAX_COUNTER_LEN_OPT = "maxCounterLen";
+
+ /**
+ * A configuration option to determine if delete keys should be counted. If set to true then delete keys will not be passed to the {@link Converter} and the
+ * statistic {@value #DELETES_IGNORED_STAT} will track the number of deleted ignored. This options defaults to {@value #INGNORE_DELETES_DEFAULT}.
+ */
+ public static final String INGNORE_DELETES_OPT = "ignoreDeletes";
+
+ /**
+ * This prefixes all counters when emitting statistics in {@link Summarizer.Collector#summarize(Summarizer.StatisticConsumer)}.
+ */
+ public static final String COUNTER_STAT_PREFIX = "c:";
+
+ /**
+ * This is the name of the statistic that tracks how many counters objects were ignored because the number of unique counters was exceeded. The max number of
+ * unique counters is specified by {@link #MAX_COUNTERS_OPT}.
+ */
+ public static final String TOO_MANY_STAT = "tooMany";
+
+ /**
+ * This is the name of the statistic that tracks how many counter objects were ignored because they were too long. The maximum lenght is specified by
+ * {@link #MAX_COUNTER_LEN_OPT}.
+ */
+ public static final String TOO_LONG_STAT = "tooLong";
+
+ /**
+ * This is the name of the statistic that tracks the total number of counter objects emitted by the {@link Converter}. This includes emitted Counter objects
+ * that were ignored.
+ */
+ public static final String EMITTED_STAT = "emitted";
+
+ /**
+ * This is the name of the statistic that tracks the total number of deleted keys seen. This statistic is only incremented when the
+ * {@value #INGNORE_DELETES_OPT} option is set to true.
+ */
+ public static final String DELETES_IGNORED_STAT = "deletesIgnored";
+
+ /**
+ * This tracks the total number of key/values seen by the {@link Summarizer.Collector}
+ */
+ public static final String SEEN_STAT = "seen";
+
+ // this default can not be changed as persisted summary data depends on it. See the documentation about persistence in the Summarizer class javadoc.
+ public static final String MAX_COUNTER_DEFAULT = "1024";
+
+ // this default can not be changed as persisted summary data depends on it
+ public static final String MAX_CKL_DEFAULT = "128";
+
+ // this default can not be changed as persisted summary data depends on it
+ public static final String INGNORE_DELETES_DEFAULT = "true";
+
+ private static final String[] ALL_STATS = new String[] {TOO_LONG_STAT, TOO_MANY_STAT, EMITTED_STAT, SEEN_STAT, DELETES_IGNORED_STAT};
+
+ private int maxCounters;
+ private int maxCounterKeyLen;
+ private boolean ignoreDeletes;
+
+ private void init(SummarizerConfiguration conf) {
+ maxCounters = Integer.parseInt(conf.getOptions().getOrDefault(MAX_COUNTERS_OPT, MAX_COUNTER_DEFAULT));
+ maxCounterKeyLen = Integer.parseInt(conf.getOptions().getOrDefault(MAX_COUNTER_LEN_OPT, MAX_CKL_DEFAULT));
+ ignoreDeletes = Boolean.parseBoolean(conf.getOptions().getOrDefault(INGNORE_DELETES_OPT, INGNORE_DELETES_DEFAULT));
+ }
+
+ /**
+ * A function that converts key values to zero or more counter objects.
+ *
+ * @since 2.0.0
+ */
+ public static interface Converter<K> {
+ /**
+ * @param consumer
+ * emit counter objects derived from key and value to this consumer
+ */
+ public void convert(Key k, Value v, Consumer<K> consumer);
+ }
+
+ /**
+ *
+ * @return A function that is used to convert each key value to zero or more counter objects. Each function returned should be independent.
+ */
+ protected abstract Converter<K> converter();
+
+ /**
+ * @return A function that is used to convert counter objects to String. The default function calls {@link Object#toString()} on the counter object.
+ */
+ protected Function<K,String> encoder() {
+ return Object::toString;
+ }
+
+ /**
+ * Override this if your key type is mutable and subject to change.
+ *
+ * @return a function that used to copy the counter object. This function is only used when the collector has never seen the counter object before. In this
+ * case the collector needs to possibly copy the counter object before using as map key. The default implementation is the
+ * {@link UnaryOperator#identity()} function.
+ */
+ protected UnaryOperator<K> copier() {
+ return UnaryOperator.identity();
+ }
+
+ @Override
+ public Collector collector(SummarizerConfiguration sc) {
+ init(sc);
+ return new Collector() {
+
+ // Map used for computing summary incrementally uses ByteSequence for key which is more efficient than converting String for each Key. The
+ // conversion to String is deferred until the summary is requested.
+
+ private Map<K,MutableLong> counters = new HashMap<>();
+ private long tooMany = 0;
+ private long tooLong = 0;
+ private long seen = 0;
+ private long emitted = 0;
+ private long deleted = 0;
+ private Converter<K> converter = converter();
+ private Function<K,String> encoder = encoder();
+ private UnaryOperator<K> copier = copier();
+
+ private void incrementCounter(K counter) {
+ emitted++;
+
+ MutableLong ml = counters.get(counter);
+ if (ml == null) {
+ if (counters.size() >= maxCounters) {
+ // no need to store this counter in the map and get() it... just use instance variable
+ tooMany++;
+ } else {
+ // we have never seen this key before, check if its too long
+ if (encoder.apply(counter).length() >= maxCounterKeyLen) {
+ tooLong++;
+ } else {
+ counters.put(copier.apply(counter), new MutableLong(1));
+ }
+ }
+ } else {
+ // using mutable long allows calling put() to be avoided
+ ml.increment();
+ }
+ }
+
+ @Override
+ public void accept(Key k, Value v) {
+ seen++;
+ if (ignoreDeletes && k.isDeleted()) {
+ deleted++;
+ } else {
+ converter.convert(k, v, this::incrementCounter);
+ }
+ }
+
+ @Override
+ public void summarize(StatisticConsumer sc) {
+ StringBuilder sb = new StringBuilder(COUNTER_STAT_PREFIX);
+
+ for (Entry<K,MutableLong> entry : counters.entrySet()) {
+ sb.setLength(COUNTER_STAT_PREFIX.length());
+ sb.append(encoder.apply(entry.getKey()));
+ sc.accept(sb.toString(), entry.getValue().longValue());
+ }
+
+ sc.accept(TOO_MANY_STAT, tooMany);
+ sc.accept(TOO_LONG_STAT, tooLong);
+ sc.accept(EMITTED_STAT, emitted);
+ sc.accept(SEEN_STAT, seen);
+ sc.accept(DELETES_IGNORED_STAT, deleted);
+ }
+ };
+ }
+
+ @Override
+ public Combiner combiner(SummarizerConfiguration sc) {
+ init(sc);
+ return new Combiner() {
+
+ @Override
+ public void merge(Map<String,Long> summary1, Map<String,Long> summary2) {
+
+ for (String key : ALL_STATS) {
+ summary1.merge(key, summary2.getOrDefault(key, 0l), Long::sum);
+ }
+
+ for (Entry<String,Long> entry : summary2.entrySet()) {
+ String k2 = entry.getKey();
+ Long v2 = entry.getValue();
+
+ if (k2.startsWith(COUNTER_STAT_PREFIX)) {
+ summary1.merge(k2, v2, Long::sum);
+ }
+ }
+
+ if (summary1.size() - ALL_STATS.length > maxCounters) {
+ // find the keys with the lowest counts to remove
+ List<String> keysToRemove = summary1.entrySet().stream().filter(e -> e.getKey().startsWith(COUNTER_STAT_PREFIX)) // filter out non counters
+ .sorted((e1, e2) -> Long.compare(e2.getValue(), e1.getValue())) // sort descending by count
+ .skip(maxCounters) // skip most frequent
+ .map(e -> e.getKey()).collect(Collectors.toList()); // collect the least frequent counters in a list
+
+ long removedCount = 0;
+ for (String key : keysToRemove) {
+ removedCount += summary1.remove(key);
+ }
+
+ summary1.merge(TOO_MANY_STAT, removedCount, Long::sum);
+ }
+ }
+ };
+ }
+}