You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/03/20 14:48:59 UTC
[3/9] accumulo git commit: ACCUMULO-4501 ACCUMULO-96 Added
Summarization
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java b/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java
new file mode 100644
index 0000000..9da91c0
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.util;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A simple future wrapper that will set an atomic boolean to true if a future is successfully canceled
+ */
+public class CancelFlagFuture<T> implements Future<T> {
+
+ private Future<T> wrappedFuture;
+ private AtomicBoolean cancelFlag;
+
+ public CancelFlagFuture(Future<T> wrappedFuture, AtomicBoolean cancelFlag) {
+ this.wrappedFuture = wrappedFuture;
+ this.cancelFlag = cancelFlag;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean ret = wrappedFuture.cancel(mayInterruptIfRunning);
+ if (ret) {
+ cancelFlag.set(true);
+ }
+ return ret;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return wrappedFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return wrappedFuture.isDone();
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ return wrappedFuture.get();
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return wrappedFuture.get(timeout, unit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java b/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
new file mode 100644
index 0000000..c417b0f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
+public class CompletableFutureUtil {
+
+ // create a binary tree of completable future operations, where each node in the tree merges the results of their children when complete
+ public static <T> CompletableFuture<T> merge(List<CompletableFuture<T>> futures, BiFunction<T,T,T> mergeFunc, Supplier<T> nothing) {
+ if (futures.size() == 0) {
+ return CompletableFuture.completedFuture(nothing.get());
+ }
+ while (futures.size() > 1) {
+ ArrayList<CompletableFuture<T>> mergedFutures = new ArrayList<>(futures.size() / 2);
+ for (int i = 0; i < futures.size(); i += 2) {
+ if (i + 1 == futures.size()) {
+ mergedFutures.add(futures.get(i));
+ } else {
+ mergedFutures.add(futures.get(i).thenCombine(futures.get(i + 1), mergeFunc));
+ }
+ }
+
+ futures = mergedFutures;
+ }
+
+ return futures.get(0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/thrift/data.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/data.thrift b/core/src/main/thrift/data.thrift
index 3e5c56c..648c37d 100644
--- a/core/src/main/thrift/data.thrift
+++ b/core/src/main/thrift/data.thrift
@@ -149,6 +149,40 @@ struct TConditionalSession {
3:i64 ttl;
}
+struct TSummarizerConfiguration {
+ 1:string classname
+ 2:map<string, string> options
+ 3:string configId
+}
+
+struct TSummary {
+ 1:map<string, i64> summary
+ 2:TSummarizerConfiguration config
+ 3:i64 filesContaining
+ 4:i64 filesExceeding
+ 5:i64 filesLarge
+}
+
+struct TSummaries {
+ 1:bool finished
+ 2:i64 sessionId
+ 3:i64 totalFiles
+ 4:i64 deletedFiles
+ 5:list<TSummary> summaries
+}
+
+struct TRowRange {
+ 1:binary startRow
+ 2:binary endRow
+}
+
+struct TSummaryRequest {
+ 1:string tableId
+ 2:TRowRange bounds
+ 3:list<TSummarizerConfiguration> summarizers
+ 4:string summarizerPattern
+}
+
typedef map<TKeyExtent,list<TConditionalMutation>> CMBatch
typedef map<TKeyExtent,list<TMutation>> UpdateBatch
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 7697a2d..b56449f 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -230,6 +230,11 @@ service TabletClientService extends client.ClientService {
list<ActiveCompaction> getActiveCompactions(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec)
oneway void removeLogs(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<string> filenames)
list<string> getActiveLogs(1:trace.TInfo tinfo, 2:security.TCredentials credentials)
+
+ data.TSummaries startGetSummaries(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:data.TSummaryRequest request) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException tope)
+ data.TSummaries startGetSummariesForPartition(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:data.TSummaryRequest request, 4:i32 modulus, 5:i32 remainder) throws (1:client.ThriftSecurityException sec)
+ data.TSummaries startGetSummariesFromFiles(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:data.TSummaryRequest request, 4:map<string,list<data.TRowRange>> files) throws (1:client.ThriftSecurityException sec)
+ data.TSummaries contiuneGetSummaries(1:trace.TInfo tinfo, 2:i64 sessionId) throws (1:NoSuchScanIDException nssi)
}
typedef i32 TabletID
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
index 1d699c2..496cde3 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
@@ -27,6 +27,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
+import java.util.function.Predicate;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -37,8 +38,10 @@ import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.DiskUsage;
import org.apache.accumulo.core.client.admin.Locations;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.SummaryRetriever;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.security.Authorizations;
@@ -249,6 +252,29 @@ public class TableOperationsHelperTest {
public Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public SummaryRetriever summaries(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addSummarizers(String tableName, SummarizerConfiguration... summarizerConf) throws TableNotFoundException, AccumuloException,
+ AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException,
+ AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
}
protected TableOperationsHelper getHelper() {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
index d85db92..367c833 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
@@ -17,12 +17,18 @@
package org.apache.accumulo.core.client.mapred;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Collection;
import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
import org.apache.accumulo.core.client.sample.RowSampler;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.CountingSummarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
+import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
@@ -43,6 +49,9 @@ public class AccumuloFileOutputFormatTest {
samplerConfig.addOption("hasher", "murmur3_32");
samplerConfig.addOption("modulus", "109");
+ SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 2048).build();
+ SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build();
+
JobConf job = new JobConf();
AccumuloFileOutputFormat.setReplication(job, a);
AccumuloFileOutputFormat.setFileBlockSize(job, b);
@@ -50,6 +59,7 @@ public class AccumuloFileOutputFormatTest {
AccumuloFileOutputFormat.setIndexBlockSize(job, d);
AccumuloFileOutputFormat.setCompressionType(job, e);
AccumuloFileOutputFormat.setSampler(job, samplerConfig);
+ AccumuloFileOutputFormat.setSummarizers(job, sc1, sc2);
AccumuloConfiguration acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job);
@@ -60,6 +70,11 @@ public class AccumuloFileOutputFormatTest {
assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
+ Collection<SummarizerConfiguration> summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf);
+ assertEquals(2, summarizerConfigs.size());
+ assertTrue(summarizerConfigs.contains(sc1));
+ assertTrue(summarizerConfigs.contains(sc2));
+
a = 17;
b = 1300l;
c = 150l;
@@ -85,5 +100,8 @@ public class AccumuloFileOutputFormatTest {
assertEquals(110l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
+
+ summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf);
+ assertEquals(0, summarizerConfigs.size());
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
index 39d226b..5143453 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
@@ -17,12 +17,18 @@
package org.apache.accumulo.core.client.mapreduce;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Collection;
import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
import org.apache.accumulo.core.client.sample.RowSampler;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.CountingSummarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
+import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
@@ -43,6 +49,9 @@ public class AccumuloFileOutputFormatTest {
samplerConfig.addOption("hasher", "murmur3_32");
samplerConfig.addOption("modulus", "109");
+ SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 2048).build();
+ SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build();
+
Job job1 = Job.getInstance();
AccumuloFileOutputFormat.setReplication(job1, a);
AccumuloFileOutputFormat.setFileBlockSize(job1, b);
@@ -50,6 +59,7 @@ public class AccumuloFileOutputFormatTest {
AccumuloFileOutputFormat.setIndexBlockSize(job1, d);
AccumuloFileOutputFormat.setCompressionType(job1, e);
AccumuloFileOutputFormat.setSampler(job1, samplerConfig);
+ AccumuloFileOutputFormat.setSummarizers(job1, sc1, sc2);
AccumuloConfiguration acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job1.getConfiguration());
@@ -60,6 +70,11 @@ public class AccumuloFileOutputFormatTest {
assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
+ Collection<SummarizerConfiguration> summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf);
+ assertEquals(2, summarizerConfigs.size());
+ assertTrue(summarizerConfigs.contains(sc1));
+ assertTrue(summarizerConfigs.contains(sc2));
+
a = 17;
b = 1300l;
c = 150l;
@@ -86,5 +101,8 @@ public class AccumuloFileOutputFormatTest {
assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
+ summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf);
+ assertEquals(0, summarizerConfigs.size());
+
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
index 4993810..3352b0f 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -36,6 +37,11 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.sample.RowSampler;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.CounterSummary;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
+import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ArrayByteSequence;
@@ -83,6 +89,10 @@ public class RFileTest {
}
private SortedMap<Key,Value> createTestData(int startRow, int rows, int startFamily, int families, int qualifiers) {
+ return createTestData(startRow, rows, startFamily, families, qualifiers, "");
+ }
+
+ private SortedMap<Key,Value> createTestData(int startRow, int rows, int startFamily, int families, int qualifiers, String... vis) {
TreeMap<Key,Value> testData = new TreeMap<>();
for (int r = 0; r < rows; r++) {
@@ -91,8 +101,10 @@ public class RFileTest {
String fam = colStr(f + startFamily);
for (int q = 0; q < qualifiers; q++) {
String qual = colStr(q);
- Key k = new Key(row, fam, qual);
- testData.put(k, new Value((k.hashCode() + "").getBytes()));
+ for (String v : vis) {
+ Key k = new Key(row, fam, qual, v);
+ testData.put(k, new Value((k.hashCode() + "").getBytes()));
+ }
}
}
}
@@ -498,6 +510,148 @@ public class RFileTest {
scanner.close();
}
+ @Test
+ public void testSummaries() throws Exception {
+ SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class).build();
+ SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).build();
+
+ LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+ String testFile = createTmpTestFile();
+
+ SortedMap<Key,Value> testData1 = createTestData(0, 100, 0, 4, 1, "A&B", "A&B&C");
+
+ RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).withSummarizers(sc1, sc2).build();
+ writer.append(testData1.entrySet());
+ writer.close();
+
+ // verify summary data
+ Collection<Summary> summaries = RFile.summaries().from(testFile).withFileSystem(localFs).read();
+ Assert.assertEquals(2, summaries.size());
+ for (Summary summary : summaries) {
+ Assert.assertEquals(0, summary.getFileStatistics().getInaccurate());
+ Assert.assertEquals(1, summary.getFileStatistics().getTotal());
+ String className = summary.getSummarizerConfiguration().getClassName();
+ CounterSummary counterSummary = new CounterSummary(summary);
+ if (className.equals(FamilySummarizer.class.getName())) {
+ Map<String,Long> counters = counterSummary.getCounters();
+ Map<String,Long> expected = ImmutableMap.of("0000", 200l, "0001", 200l, "0002", 200l, "0003", 200l);
+ Assert.assertEquals(expected, counters);
+ } else if (className.equals(VisibilitySummarizer.class.getName())) {
+ Map<String,Long> counters = counterSummary.getCounters();
+ Map<String,Long> expected = ImmutableMap.of("A&B", 400l, "A&B&C", 400l);
+ Assert.assertEquals(expected, counters);
+ } else {
+ Assert.fail("Unexpected classname " + className);
+ }
+ }
+
+ // check if writing summary data impacted normal rfile functionality
+ Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withAuthorizations(new Authorizations("A", "B", "C")).build();
+ Assert.assertEquals(testData1, toMap(scanner));
+ scanner.close();
+
+ String testFile2 = createTmpTestFile();
+ SortedMap<Key,Value> testData2 = createTestData(100, 100, 0, 4, 1, "A&B", "A&B&C");
+ writer = RFile.newWriter().to(testFile2).withFileSystem(localFs).withSummarizers(sc1, sc2).build();
+ writer.append(testData2.entrySet());
+ writer.close();
+
+ // verify reading summaries from multiple files works
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).read();
+ Assert.assertEquals(2, summaries.size());
+ for (Summary summary : summaries) {
+ Assert.assertEquals(0, summary.getFileStatistics().getInaccurate());
+ Assert.assertEquals(2, summary.getFileStatistics().getTotal());
+ String className = summary.getSummarizerConfiguration().getClassName();
+ CounterSummary counterSummary = new CounterSummary(summary);
+ if (className.equals(FamilySummarizer.class.getName())) {
+ Map<String,Long> counters = counterSummary.getCounters();
+ Map<String,Long> expected = ImmutableMap.of("0000", 400l, "0001", 400l, "0002", 400l, "0003", 400l);
+ Assert.assertEquals(expected, counters);
+ } else if (className.equals(VisibilitySummarizer.class.getName())) {
+ Map<String,Long> counters = counterSummary.getCounters();
+ Map<String,Long> expected = ImmutableMap.of("A&B", 800l, "A&B&C", 800l);
+ Assert.assertEquals(expected, counters);
+ } else {
+ Assert.fail("Unexpected classname " + className);
+ }
+ }
+
+ // verify reading a subset of summaries works
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 0);
+
+ // the following test check boundry conditions for start row and end row
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(99)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 0);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(98)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(0)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow("#").read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 0);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(100)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 1);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(99)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 0);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(100)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(199)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 0);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(50))
+ .endRow(rowStr(150)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 2);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(120))
+ .endRow(rowStr(150)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 1);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(50))
+ .endRow(rowStr(199)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow("#").endRow(rowStr(150))
+ .read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(199)).read();
+ checkSummaries(summaries, ImmutableMap.of(), 0);
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(200)).read();
+ checkSummaries(summaries, ImmutableMap.of(), 0);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow("#").read();
+ checkSummaries(summaries, ImmutableMap.of(), 0);
+
+ summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(0)).read();
+ checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 1);
+ }
+
+ private void checkSummaries(Collection<Summary> summaries, Map<String,Long> expected, int extra) {
+ Assert.assertEquals(1, summaries.size());
+ for (Summary summary : summaries) {
+ Assert.assertEquals(extra, summary.getFileStatistics().getInaccurate());
+ Assert.assertEquals(extra, summary.getFileStatistics().getExtra());
+ Assert.assertEquals(2, summary.getFileStatistics().getTotal());
+ String className = summary.getSummarizerConfiguration().getClassName();
+ CounterSummary counterSummary = new CounterSummary(summary);
+ if (className.equals(VisibilitySummarizer.class.getName())) {
+ Map<String,Long> counters = counterSummary.getCounters();
+
+ Assert.assertEquals(expected, counters);
+ } else {
+ Assert.fail("Unexpected classname " + className);
+ }
+ }
+ }
+
@Test(expected = IllegalArgumentException.class)
public void testOutOfOrder() throws Exception {
// test that exception declared in API is thrown
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java b/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java
new file mode 100644
index 0000000..06f8d35
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.summary;
+
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.COUNTER_STAT_PREFIX;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.DELETES_IGNORED_STAT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.EMITTED_STAT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.INGNORE_DELETES_OPT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.MAX_COUNTERS_OPT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.MAX_COUNTER_LEN_OPT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.SEEN_STAT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.TOO_LONG_STAT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.TOO_MANY_STAT;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.accumulo.core.client.summary.CounterSummary;
+import org.apache.accumulo.core.client.summary.CountingSummarizer;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.Summarizer.Collector;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
+import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CountingSummarizerTest {
+
+ public static class MultiSummarizer extends CountingSummarizer<String> {
+ @Override
+ protected Converter<String> converter() {
+ return (k, v, c) -> {
+ c.accept("rp:" + k.getRowData().subSequence(0, 2).toString());
+ c.accept("fp:" + k.getColumnFamilyData().subSequence(0, 2).toString());
+ c.accept("qp:" + k.getColumnQualifierData().subSequence(0, 2).toString());
+ };
+ }
+ }
+
+ @Test
+ public void testMultipleEmit() {
+ SummarizerConfiguration sc = SummarizerConfiguration.builder(MultiSummarizer.class).build();
+ MultiSummarizer countSum = new MultiSummarizer();
+
+ Summarizer.Collector collector = countSum.collector(sc);
+
+ Value val = new Value("abc");
+
+ HashMap<String,Long> expected = new HashMap<>();
+
+ for (String row : new String[] {"ask", "asleep", "some", "soul"}) {
+ for (String fam : new String[] {"hop", "hope", "nope", "noop"}) {
+ for (String qual : new String[] {"mad", "lad", "lab", "map"}) {
+ collector.accept(new Key(row, fam, qual), val);
+
+ expected.merge("rp:" + row.substring(0, 2), 1l, Long::sum);
+ expected.merge("fp:" + fam.substring(0, 2), 1l, Long::sum);
+ expected.merge("qp:" + qual.substring(0, 2), 1l, Long::sum);
+ }
+ }
+ }
+
+ HashMap<String,Long> stats = new HashMap<>();
+ collector.summarize((k, v) -> stats.put(k, v));
+
+ CounterSummary csum = new CounterSummary(stats);
+ Assert.assertEquals(expected, csum.getCounters());
+ Assert.assertEquals(64, csum.getSeen());
+ Assert.assertEquals(3 * 64, csum.getEmitted());
+ Assert.assertEquals(0, csum.getIgnored());
+ Assert.assertEquals(0, csum.getDeletesIgnored());
+ }
+
+ @Test
+ public void testSummarizing() {
+ SummarizerConfiguration sc = SummarizerConfiguration.builder(FamilySummarizer.class).addOptions(MAX_COUNTERS_OPT, "5", MAX_COUNTER_LEN_OPT, "10").build();
+ FamilySummarizer countSum = new FamilySummarizer();
+
+ Value val = new Value("abc");
+
+ Summarizer.Collector collector = countSum.collector(sc);
+ for (String fam : Arrays.asList("f1", "f1", "f1", "f2", "f1", "f70000000000000000000", "f70000000000000000001", "f2", "f3", "f4", "f5", "f6", "f7", "f3",
+ "f7")) {
+ collector.accept(new Key("r", fam), val);
+ }
+
+ Key dk = new Key("r", "f2");
+ dk.setDeleted(true);
+ collector.accept(dk, new Value(""));
+
+ HashMap<String,Long> stats = new HashMap<>();
+ collector.summarize((k, v) -> stats.put(k, v));
+
+ String p = COUNTER_STAT_PREFIX;
+
+ HashMap<String,Long> expected = new HashMap<>();
+ expected.put(p + "f1", 4l);
+ expected.put(p + "f2", 2l);
+ expected.put(p + "f3", 2l);
+ expected.put(p + "f4", 1l);
+ expected.put(p + "f5", 1l);
+ expected.put(TOO_LONG_STAT, 2l);
+ expected.put(TOO_MANY_STAT, 3l);
+ expected.put(SEEN_STAT, 16l);
+ expected.put(EMITTED_STAT, 15l);
+ expected.put(DELETES_IGNORED_STAT, 1l);
+
+ Assert.assertEquals(expected, stats);
+
+ CounterSummary csum = new CounterSummary(stats);
+ Assert.assertEquals(5, csum.getIgnored());
+ Assert.assertEquals(3, csum.getTooMany());
+ Assert.assertEquals(2, csum.getTooLong());
+ Assert.assertEquals(16, csum.getSeen());
+ Assert.assertEquals(15, csum.getEmitted());
+ Assert.assertEquals(1, csum.getDeletesIgnored());
+
+ expected.clear();
+ expected.put("f1", 4l);
+ expected.put("f2", 2l);
+ expected.put("f3", 2l);
+ expected.put("f4", 1l);
+ expected.put("f5", 1l);
+ Assert.assertEquals(expected, csum.getCounters());
+
+ }
+
+ @Test
+ public void testMerge() {
+ SummarizerConfiguration sc = SummarizerConfiguration.builder(VisibilitySummarizer.class).addOption(MAX_COUNTERS_OPT, "5").build();
+ VisibilitySummarizer countSum = new VisibilitySummarizer();
+
+ String p = COUNTER_STAT_PREFIX;
+
+ HashMap<String,Long> sm1 = new HashMap<>();
+ sm1.put(p + "f001", 9l);
+ sm1.put(p + "f002", 4l);
+ sm1.put(p + "f003", 2l);
+ sm1.put(p + "f004", 1l);
+ sm1.put(p + "f005", 19l);
+ sm1.put(EMITTED_STAT, 15l);
+ sm1.put(SEEN_STAT, 5l);
+ sm1.put(DELETES_IGNORED_STAT, 1l);
+
+ HashMap<String,Long> sm2 = new HashMap<>();
+ sm2.put(p + "f001", 1l);
+ sm2.put(p + "f002", 2l);
+ sm2.put(p + "f00a", 7l);
+ sm2.put(p + "f00b", 1l);
+ sm2.put(p + "f00c", 17l);
+ sm2.put(EMITTED_STAT, 18l);
+ sm2.put(SEEN_STAT, 6l);
+ sm2.put(DELETES_IGNORED_STAT, 2l);
+
+ countSum.combiner(sc).merge(sm1, sm2);
+
+ HashMap<String,Long> expected = new HashMap<>();
+ expected.put(p + "f001", 10l);
+ expected.put(p + "f002", 6l);
+ expected.put(p + "f005", 19l);
+ expected.put(p + "f00a", 7l);
+ expected.put(p + "f00c", 17l);
+ expected.put(TOO_LONG_STAT, 0l);
+ expected.put(TOO_MANY_STAT, 4l);
+ expected.put(EMITTED_STAT, 18l + 15l);
+ expected.put(SEEN_STAT, 6l + 5l);
+ expected.put(DELETES_IGNORED_STAT, 3l);
+
+ Assert.assertEquals(expected, sm1);
+
+ sm2.clear();
+ sm2.put(p + "f001", 19l);
+ sm2.put(p + "f002", 2l);
+ sm2.put(p + "f003", 3l);
+ sm2.put(p + "f00b", 13l);
+ sm2.put(p + "f00c", 2l);
+ sm2.put(TOO_LONG_STAT, 1l);
+ sm2.put(TOO_MANY_STAT, 3l);
+ sm2.put(EMITTED_STAT, 21l);
+ sm2.put(SEEN_STAT, 7l);
+ sm2.put(DELETES_IGNORED_STAT, 5l);
+
+ countSum.combiner(sc).merge(sm1, sm2);
+
+ expected.clear();
+ expected.put(p + "f001", 29l);
+ expected.put(p + "f002", 8l);
+ expected.put(p + "f005", 19l);
+ expected.put(p + "f00b", 13l);
+ expected.put(p + "f00c", 19l);
+ expected.put(TOO_LONG_STAT, 1l);
+ expected.put(TOO_MANY_STAT, 17l);
+ expected.put(EMITTED_STAT, 21l + 18 + 15);
+ expected.put(SEEN_STAT, 7l + 6 + 5);
+ expected.put(DELETES_IGNORED_STAT, 8l);
+ }
+
+ @Test
+ public void testCountDeletes() {
+ SummarizerConfiguration sc = SummarizerConfiguration.builder(FamilySummarizer.class).addOptions(INGNORE_DELETES_OPT, "false").build();
+ FamilySummarizer countSum = new FamilySummarizer();
+
+ Key k1 = new Key("r1", "f1");
+ Key k2 = new Key("r1", "f1");
+ k2.setDeleted(true);
+ Key k3 = new Key("r1", "f2");
+
+ Collector collector = countSum.collector(sc);
+ collector.accept(k1, new Value(""));
+ collector.accept(k2, new Value(""));
+ collector.accept(k3, new Value(""));
+
+ String p = COUNTER_STAT_PREFIX;
+
+ HashMap<String,Long> expected = new HashMap<>();
+ expected.put(p + "f1", 2l);
+ expected.put(p + "f2", 1l);
+ expected.put(TOO_LONG_STAT, 0l);
+ expected.put(TOO_MANY_STAT, 0l);
+ expected.put(SEEN_STAT, 3l);
+ expected.put(EMITTED_STAT, 3l);
+ expected.put(DELETES_IGNORED_STAT, 0l);
+
+ HashMap<String,Long> stats = new HashMap<>();
+ collector.summarize(stats::put);
+ Assert.assertEquals(expected, stats);
+
+ CounterSummary csum = new CounterSummary(stats);
+ Assert.assertEquals(0, csum.getIgnored());
+ Assert.assertEquals(0, csum.getTooMany());
+ Assert.assertEquals(0, csum.getTooLong());
+ Assert.assertEquals(3, csum.getSeen());
+ Assert.assertEquals(3, csum.getEmitted());
+ Assert.assertEquals(0, csum.getDeletesIgnored());
+
+ expected.clear();
+ expected.put("f1", 2l);
+ expected.put("f2", 1l);
+ Assert.assertEquals(expected, csum.getCounters());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java b/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java
new file mode 100644
index 0000000..95702d3
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.client.summary.Summary.FileStatistics;
+import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
+import org.apache.accumulo.core.summary.SummaryCollection.FileSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SummaryCollectionTest {
+ @Test
+ public void testDeleted() {
+ SummarizerConfiguration conf = SummarizerConfiguration.builder(FamilySummarizer.class).build();
+
+ HashMap<String,Long> stats = new HashMap<String,Long>();
+ stats.put("c:foo", 9L);
+ FileSummary fs1 = new FileSummary(conf, stats, false);
+ SummaryCollection sc1 = new SummaryCollection(Collections.singleton(fs1));
+
+ stats = new HashMap<String,Long>();
+ stats.put("c:foo", 5L);
+ stats.put("c:bar", 3L);
+ FileSummary fs2 = new FileSummary(conf, stats, true);
+ SummaryCollection sc2 = new SummaryCollection(Collections.singleton(fs2));
+
+ SummaryCollection sc3 = new SummaryCollection(Collections.emptyList());
+
+ SummaryCollection sc4 = new SummaryCollection(Collections.emptyList(), true);
+
+ SummarizerFactory factory = new SummarizerFactory();
+ SummaryCollection mergeSc = new SummaryCollection();
+ for (SummaryCollection sc : Arrays.asList(sc1, sc2, sc3, sc4, sc4)) {
+ mergeSc.merge(sc, factory);
+ }
+
+ for (SummaryCollection sc : Arrays.asList(mergeSc, new SummaryCollection(mergeSc.toThrift()))) {
+ List<Summary> summaries = sc.getSummaries();
+ Assert.assertEquals(1, summaries.size());
+ Summary summary = summaries.get(0);
+ FileStatistics filestats = summary.getFileStatistics();
+ Assert.assertEquals(5, filestats.getTotal());
+ Assert.assertEquals(1, filestats.getExtra());
+ Assert.assertEquals(0, filestats.getLarge());
+ Assert.assertEquals(1, filestats.getMissing());
+ Assert.assertEquals(2, filestats.getDeleted());
+ Assert.assertEquals(4, filestats.getInaccurate());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java b/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
new file mode 100644
index 0000000..9de580d
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CompletableFutureUtilTest {
+ @Test
+ public void testMerge() throws Exception {
+ ExecutorService es = Executors.newFixedThreadPool(3);
+ try {
+ for (int n : new int[] {1, 2, 3, 997, 1000}) {
+ List<CompletableFuture<Integer>> futures = new ArrayList<>();
+ for (int i = 1; i <= n; i++) {
+ final int num = i;
+ futures.add(CompletableFuture.supplyAsync(() -> num, es));
+ }
+
+ CompletableFuture<Integer> mergedFutures = CompletableFutureUtil.merge(futures, Integer::sum, () -> 0);
+ Assert.assertEquals(n * (n + 1) / 2, mergedFutures.get().intValue());
+ }
+
+ // test zero
+ CompletableFuture<Integer> mergedFutures = CompletableFutureUtil.merge(Collections.emptyList(), Integer::sum, () -> 0);
+ Assert.assertEquals(0, mergedFutures.get().intValue());
+ } finally {
+ es.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
index fe5403c..979d943 100644
--- a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
+++ b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
@@ -63,6 +63,8 @@ include::chapters/kerberos.txt[]
include::chapters/sampling.txt[]
+include::chapters/summaries.txt[]
+
include::chapters/administration.txt[]
include::chapters/multivolume.txt[]
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/docs/src/main/asciidoc/chapters/summaries.txt
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/chapters/summaries.txt b/docs/src/main/asciidoc/chapters/summaries.txt
new file mode 100644
index 0000000..08d8011
--- /dev/null
+++ b/docs/src/main/asciidoc/chapters/summaries.txt
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+== Summary Statistics
+
+=== Overview
+
+Accumulo has the ability to generate summary statistics about data in a table
+using user defined functions. Currently these statistics are only generated for
+data written to files. Data recently written to Accumulo that is still in
+memory will not contribute to summary statistics.
+
+This feature can be used to inform a user about what data is in their table.
+Summary statistics can also be used by compaction strategies to make decisions
+about which files to compact.
+
+Summary data is stored in each file Accumulo produces. Accumulo can gather
+summary information from across a cluster merging it along the way. In order
+for this to be fast the, summary information should fit in cache. There is a
+dedicated cache for summary data on each tserver with a configurable size. In
+order for summary data to fit in cache, it should probably be small.
+
+For information on writing a custom summarizer see the javadoc for
++org.apache.accumulo.core.client.summary.Summarizer+. The package
++org.apache.accumulo.core.client.summary.summarizers+ contains summarizer
+implementations that ship with Accumulo and can be configured for use.
+
+=== Inaccuracies
+
+Summary data can be inaccurate when files are missing summary data or when
+files have extra summary data. Files can contain data outside of a tablets
+boundaries. This can happen as result of bulk imported files and tablet splits.
+When this happens, those files could contain extra summary information.
+Accumulo offsets this some by storing summary information for multiple row
+ranges per a file. However, the ranges are not granular enough to completely
+offset extra data.
+
+Any source of inaccuracies is reported when summary information is requested.
+In the shell examples below this can be seen on the +File Statistics+ line.
+For files missing summary information, the compact command in the shell has a
++--sf-no-summary+ option. This options compacts files that do not have the
+summary information configured for the table. The compact command also has the
++--sf-extra-summary+ option which will compact files with extra summary
+information.
+
+=== Configuring
+
+The following tablet server and table properties configure summarization.
+
+* <<appendices/config.txt#_tserver_cache_summary_size>>
+* <<appendices/config.txt#_tserver_summary_partition_threads>>
+* <<appendices/config.txt#_tserver_summary_remote_threads>>
+* <<appendices/config.txt#_tserver_summary_retrieval_threads>>
+* <<appendices/config.txt#TABLE_SUMMARIZER_PREFIX>>
+* <<appendices/config.txt#_table_file_summary_maxsize>>
+
+=== Permissions
+
+Because summary data may be derived from sensitive data, requesting summary data
+requires a special permission. User must have the table permission
++GET_SUMMARIES+ in order to retrieve summary data.
+
+
+=== Bulk import
+
+When generating rfiles to bulk import into Accumulo, those rfiles can contain
+summary data. To use this feature, look at the javadoc on the
++AccumuloFileOutputFormat.setSummarizers(...)+ method. Also,
++org.apache.accumulo.core.client.rfile.RFile+ has options for creating RFiles
+with embedded summary data.
+
+=== Examples
+
+This example walks through using summarizers in the Accumulo shell. Below a
+table is created and some data is inserted to summarize.
+
+ root@uno> createtable summary_test
+ root@uno summary_test> setauths -u root -s PI,GEO,TIME
+ root@uno summary_test> insert 3b503bd name last Doe
+ root@uno summary_test> insert 3b503bd name first John
+ root@uno summary_test> insert 3b503bd contact address "123 Park Ave, NY, NY" -l PI&GEO
+ root@uno summary_test> insert 3b503bd date birth "1/11/1942" -l PI&TIME
+ root@uno summary_test> insert 3b503bd date married "5/11/1962" -l PI&TIME
+ root@uno summary_test> insert 3b503bd contact home_phone 1-123-456-7890 -l PI
+ root@uno summary_test> insert d5d18dd contact address "50 Lake Shore Dr, Chicago, IL" -l PI&GEO
+ root@uno summary_test> insert d5d18dd name first Jane
+ root@uno summary_test> insert d5d18dd name last Doe
+ root@uno summary_test> insert d5d18dd date birth 8/15/1969 -l PI&TIME
+ root@uno summary_test> scan -s PI,GEO,TIME
+ 3b503bd contact:address [PI&GEO] 123 Park Ave, NY, NY
+ 3b503bd contact:home_phone [PI] 1-123-456-7890
+ 3b503bd date:birth [PI&TIME] 1/11/1942
+ 3b503bd date:married [PI&TIME] 5/11/1962
+ 3b503bd name:first [] John
+ 3b503bd name:last [] Doe
+ d5d18dd contact:address [PI&GEO] 50 Lake Shore Dr, Chicago, IL
+ d5d18dd date:birth [PI&TIME] 8/15/1969
+ d5d18dd name:first [] Jane
+ d5d18dd name:last [] Doe
+
+After inserting the data, summaries are requested below. No summaries are returned.
+
+ root@uno summary_test> summaries
+
+The visibility summarizer is configured below and the table is flushed.
+Flushing the table creates a file creating summary data in the process. The
+summary data returned counts how many times each column visibility occurred.
+The statistics with a +c:+ prefix are visibilities. The others are generic
+statistics created by the CountingSummarizer that VisibilitySummarizer extends.
+
+ root@uno summary_test> config -t summary_test -s table.summarizer.vis=org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer
+ root@uno summary_test> summaries
+ root@uno summary_test> flush -w
+ 2017-02-24 19:54:46,090 [shell.Shell] INFO : Flush of table summary_test completed.
+ root@uno summary_test> summaries
+ Summarizer : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {}
+ File Statistics : [total:1, missing:0, extra:0, large:0]
+ Summary Statistics :
+ c: = 4
+ c:PI = 1
+ c:PI&GEO = 2
+ c:PI&TIME = 3
+ emitted = 10
+ seen = 10
+ tooLong = 0
+ tooMany = 0
+
+VisibilitySummarizer has an option +maxCounters+ that determines the max number
+of column visibilites it will track. Below this option is set and compaction
+is forced to regenerate summary data. The new summary data only has three
+visibilites and now the +tooMany+ statistic is 4. This is the number of
+visibilites that were not counted.
+
+ root@uno summary_test> config -t summary_test -s table.summarizer.vis.opt.maxCounters=3
+ root@uno summary_test> compact -w
+ 2017-02-24 19:54:46,267 [shell.Shell] INFO : Compacting table ...
+ 2017-02-24 19:54:47,127 [shell.Shell] INFO : Compaction of table summary_test completed for given range
+ root@uno summary_test> summaries
+ Summarizer : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {maxCounters=3}
+ File Statistics : [total:1, missing:0, extra:0, large:0]
+ Summary Statistics :
+ c:PI = 1
+ c:PI&GEO = 2
+ c:PI&TIME = 3
+ emitted = 10
+ seen = 10
+ tooLong = 0
+ tooMany = 4
+
+Another summarizer is configured below that tracks the number of deletes. Also
+a compaction strategy that uses this summary data is configured. The
++TooManyDeletesCompactionStrategy+ will force a compaction of the tablet when
+the ratio of deletes to non-deletes is over 25%. This threshold is
+configurable. Below a delete is added and its reflected in the statistics. In
+this case there is 1 delete and 10 non-deletes, not enough to force a
+compaction of the tablet.
+
+....
+root@uno summary_test> config -t summary_test -s table.summarizer.del=org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer
+root@uno summary_test> compact -w
+2017-02-24 19:54:47,282 [shell.Shell] INFO : Compacting table ...
+2017-02-24 19:54:49,236 [shell.Shell] INFO : Compaction of table summary_test completed for given range
+root@uno summary_test> config -t summary_test -s table.compaction.major.ratio=10
+root@uno summary_test> config -t summary_test -s table.majc.compaction.strategy=org.apache.accumulo.tserver.compaction.strategies.TooManyDeletesCompactionStrategy
+root@uno summary_test> deletemany -r d5d18dd -c date -f
+[DELETED] d5d18dd date:birth [PI&TIME]
+root@uno summary_test> flush -w
+2017-02-24 19:54:49,686 [shell.Shell] INFO : Flush of table summary_test completed.
+root@uno summary_test> summaries
+ Summarizer : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {maxCounters=3}
+ File Statistics : [total:2, missing:0, extra:0, large:0]
+ Summary Statistics :
+ c:PI = 1
+ c:PI&GEO = 2
+ c:PI&TIME = 4
+ emitted = 11
+ seen = 11
+ tooLong = 0
+ tooMany = 4
+
+ Summarizer : org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer del {}
+ File Statistics : [total:2, missing:0, extra:0, large:0]
+ Summary Statistics :
+ deletes = 1
+ total = 11
+....
+
+Some more deletes are added and the table is flushed below. This results in 4
+deletes and 10 non-deletes, which triggers a full compaction. A full
+compaction of all files is the only time when delete markers are dropped. The
+compaction ratio was set to 10 above to show that the number of files did not
+trigger the compaction. After the compaction there no deletes 6 non-deletes.
+
+....
+root@uno summary_test> deletemany -r d5d18dd -f
+[DELETED] d5d18dd contact:address [PI&GEO]
+[DELETED] d5d18dd name:first []
+[DELETED] d5d18dd name:last []
+root@uno summary_test> flush -w
+2017-02-24 19:54:52,800 [shell.Shell] INFO : Flush of table summary_test completed.
+root@uno summary_test> summaries
+ Summarizer : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {maxCounters=3}
+ File Statistics : [total:1, missing:0, extra:0, large:0]
+ Summary Statistics :
+ c:PI = 1
+ c:PI&GEO = 1
+ c:PI&TIME = 2
+ emitted = 6
+ seen = 6
+ tooLong = 0
+ tooMany = 2
+
+ Summarizer : org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer del {}
+ File Statistics : [total:1, missing:0, extra:0, large:0]
+ Summary Statistics :
+ deletes = 0
+ total = 6
+root@uno summary_test>
+....
+
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
index 3b19aeb..1c670bc 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
@@ -142,8 +142,7 @@ public class StandaloneAccumuloCluster implements AccumuloCluster {
@Override
public StandaloneClusterControl getClusterControl() {
- return new StandaloneClusterControl(accumuloHome, clientAccumuloConfDir, serverAccumuloConfDir,
- clientCmdPrefix, serverCmdPrefix);
+ return new StandaloneClusterControl(accumuloHome, clientAccumuloConfDir, serverAccumuloConfDir, clientCmdPrefix, serverCmdPrefix);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
index bf1ccc7..45028e9 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
@@ -63,7 +63,8 @@ public class StandaloneClusterControl implements ClusterControl {
protected String accumuloServicePath, accumuloPath, accumuloUtilPath;
- public StandaloneClusterControl(String accumuloHome, String clientAccumuloConfDir, String serverAccumuloConfDir, String clientCmdPrefix, String serverCmdPrefix) {
+ public StandaloneClusterControl(String accumuloHome, String clientAccumuloConfDir, String serverAccumuloConfDir, String clientCmdPrefix,
+ String serverCmdPrefix) {
this.options = new RemoteShellOptions();
this.accumuloHome = accumuloHome;
this.clientAccumuloConfDir = clientAccumuloConfDir;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
index 20763bb..2693c05 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
@@ -139,7 +139,8 @@ public class MiniAccumuloConfigImpl {
mergeProp(Property.TSERV_PORTSEARCH.getKey(), "true");
mergeProp(Property.TSERV_DATACACHE_SIZE.getKey(), "10M");
mergeProp(Property.TSERV_INDEXCACHE_SIZE.getKey(), "10M");
- mergeProp(Property.TSERV_MAXMEM.getKey(), "50M");
+ mergeProp(Property.TSERV_SUMMARYCACHE_SIZE.getKey(), "10M");
+ mergeProp(Property.TSERV_MAXMEM.getKey(), "40M");
mergeProp(Property.TSERV_WALOG_MAX_SIZE.getKey(), "100M");
mergeProp(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false");
// since there is a small amount of memory, check more frequently for majc... setting may not be needed in 1.5
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index c4edc96..fd7746a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -845,4 +845,9 @@ public class SecurityOperation {
authenticate(credentials);
return hasSystemPermission(credentials, SystemPermission.OBTAIN_DELEGATION_TOKEN, false);
}
+
+ public boolean canGetSummaries(TCredentials credentials, String tableId, String namespaceId) throws ThriftSecurityException {
+ authenticate(credentials);
+ return hasTablePermission(credentials, tableId, namespaceId, TablePermission.GET_SUMMARIES, false);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 47a0d18..7187e3d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -50,6 +50,8 @@ import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -76,7 +78,9 @@ import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator;
import org.apache.accumulo.core.client.impl.Translators;
import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
@@ -102,7 +106,11 @@ import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.data.thrift.TKeyValue;
import org.apache.accumulo.core.data.thrift.TMutation;
import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.accumulo.core.data.thrift.TRowRange;
+import org.apache.accumulo.core.data.thrift.TSummaries;
+import org.apache.accumulo.core.data.thrift.TSummaryRequest;
import org.apache.accumulo.core.data.thrift.UpdateErrors;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.master.thrift.Compacting;
@@ -119,6 +127,9 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.summary.Gatherer;
+import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
+import org.apache.accumulo.core.summary.SummaryCollection;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
@@ -235,6 +246,7 @@ import org.apache.accumulo.tserver.session.MultiScanSession;
import org.apache.accumulo.tserver.session.ScanSession;
import org.apache.accumulo.tserver.session.Session;
import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SummarySession;
import org.apache.accumulo.tserver.session.UpdateSession;
import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
import org.apache.accumulo.tserver.tablet.CommitSession;
@@ -1790,6 +1802,109 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
log.warn("Garbage collector is attempting to remove logs through the tablet server");
log.warn("This is probably because your file Garbage Collector is an older version than your tablet servers.\n" + "Restart your file Garbage Collector.");
}
+
+ private TSummaries getSummaries(Future<SummaryCollection> future) throws TimeoutException {
+ try {
+ SummaryCollection sc = future.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+ return sc.toThrift();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private TSummaries handleTimeout(long sessionId) {
+ long timeout = TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+ sessionManager.removeIfNotAccessed(sessionId, timeout);
+ return new TSummaries(false, sessionId, -1, -1, null);
+ }
+
+ private TSummaries startSummaryOperation(TCredentials credentials, Future<SummaryCollection> future) {
+ try {
+ return getSummaries(future);
+ } catch (TimeoutException e) {
+ long sid = sessionManager.createSession(new SummarySession(credentials, future), false);
+ while (sid == 0) {
+ sessionManager.removeSession(sid);
+ sid = sessionManager.createSession(new SummarySession(credentials, future), false);
+ }
+ return handleTimeout(sid);
+ }
+ }
+
+ @Override
+ public TSummaries startGetSummaries(TInfo tinfo, TCredentials credentials, TSummaryRequest request) throws ThriftSecurityException,
+ ThriftTableOperationException, NoSuchScanIDException, TException {
+ String namespaceId;
+ try {
+ namespaceId = Tables.getNamespaceId(TabletServer.this.getInstance(), request.getTableId());
+ } catch (TableNotFoundException e1) {
+ throw new ThriftTableOperationException(request.getTableId(), null, null, TableOperationExceptionType.NOTFOUND, null);
+ }
+
+ if (!security.canGetSummaries(credentials, request.getTableId(), namespaceId)) {
+ throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+ }
+
+ ServerConfigurationFactory factory = TabletServer.this.getServerConfigurationFactory();
+ ExecutorService es = resourceManager.getSummaryPartitionExecutor();
+ Future<SummaryCollection> future = new Gatherer(TabletServer.this, request, factory.getTableConfiguration(request.getTableId())).gather(es);
+
+ return startSummaryOperation(credentials, future);
+ }
+
+ @Override
+ public TSummaries startGetSummariesForPartition(TInfo tinfo, TCredentials credentials, TSummaryRequest request, int modulus, int remainder)
+ throws ThriftSecurityException, NoSuchScanIDException, TException {
+ // do not expect users to call this directly, expect other tservers to call this method
+ if (!security.canPerformSystemActions(credentials)) {
+ throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+ }
+
+ ServerConfigurationFactory factory = TabletServer.this.getServerConfigurationFactory();
+ ExecutorService spe = resourceManager.getSummaryRemoteExecutor();
+ Future<SummaryCollection> future = new Gatherer(TabletServer.this, request, factory.getTableConfiguration(request.getTableId())).processPartition(spe,
+ modulus, remainder);
+
+ return startSummaryOperation(credentials, future);
+ }
+
+ @Override
+ public TSummaries startGetSummariesFromFiles(TInfo tinfo, TCredentials credentials, TSummaryRequest request, Map<String,List<TRowRange>> files)
+ throws ThriftSecurityException, NoSuchScanIDException, TException {
+ // do not expect users to call this directly, expect other tservers to call this method
+ if (!security.canPerformSystemActions(credentials)) {
+ throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+ }
+
+ ExecutorService srp = resourceManager.getSummaryRetrievalExecutor();
+ TableConfiguration tableCfg = confFactory.getTableConfiguration(request.getTableId());
+ BlockCache summaryCache = resourceManager.getSummaryCache();
+ BlockCache indexCache = resourceManager.getIndexCache();
+ FileSystemResolver volMgr = p -> fs.getVolumeByPath(p).getFileSystem();
+ Future<SummaryCollection> future = new Gatherer(TabletServer.this, request, tableCfg).processFiles(volMgr, files, summaryCache, indexCache, srp);
+
+ return startSummaryOperation(credentials, future);
+ }
+
+ @Override
+ public TSummaries contiuneGetSummaries(TInfo tinfo, long sessionId) throws NoSuchScanIDException, TException {
+ SummarySession session = (SummarySession) sessionManager.getSession(sessionId);
+ if (session == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ Future<SummaryCollection> future = session.getFuture();
+ try {
+ TSummaries tsums = getSummaries(future);
+ sessionManager.removeSession(sessionId);
+ return tsums;
+ } catch (TimeoutException e) {
+ return handleTimeout(sessionId);
+ }
+ }
}
private class SplitRunner implements Runnable {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 3cd7bfa..411ffa5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.tserver;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
@@ -65,7 +66,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
/**
* ResourceManager is responsible for managing the resources of all tablets within a tablet server.
@@ -86,12 +86,13 @@ public class TabletServerResourceManager {
private final ExecutorService assignMetaDataPool;
private final ExecutorService readAheadThreadPool;
private final ExecutorService defaultReadAheadThreadPool;
+ private final ExecutorService summaryRetrievalPool;
+ private final ExecutorService summaryParitionPool;
+ private final ExecutorService summaryRemotePool;
private final Map<String,ExecutorService> threadPools = new TreeMap<>();
private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;
- private final VolumeManager fs;
-
private final FileManager fileManager;
private final MemoryManager memoryManager;
@@ -100,6 +101,7 @@ public class TabletServerResourceManager {
private final BlockCache _dCache;
private final BlockCache _iCache;
+ private final BlockCache _sCache;
private final TabletServer tserver;
private final ServerConfigurationFactory conf;
@@ -141,6 +143,14 @@ public class TabletServerResourceManager {
return createEs(max, name, new LinkedBlockingQueue<Runnable>());
}
+ private ExecutorService createIdlingEs(Property max, String name, long timeout, TimeUnit timeUnit) {
+ LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+ int maxThreads = conf.getConfiguration().getCount(max);
+ ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, timeout, timeUnit, queue, new NamingThreadFactory(name));
+ tp.allowCoreThreadTimeOut(true);
+ return addEs(max, name, tp);
+ }
+
private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
int maxThreads = conf.getConfiguration().getCount(max);
ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
@@ -154,7 +164,6 @@ public class TabletServerResourceManager {
public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
this.tserver = tserver;
this.conf = tserver.getServerConfigurationFactory();
- this.fs = fs;
final AccumuloConfiguration acuConf = conf.getConfiguration();
long maxMemory = acuConf.getMemoryInBytes(Property.TSERV_MAXMEM);
@@ -163,15 +172,18 @@ public class TabletServerResourceManager {
long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE);
long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE);
+ long sCacheSize = acuConf.getMemoryInBytes(Property.TSERV_SUMMARYCACHE_SIZE);
long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
String policy = acuConf.get(Property.TSERV_CACHE_POLICY);
if (policy.equalsIgnoreCase("LRU")) {
_iCache = new LruBlockCache(iCacheSize, blockSize);
_dCache = new LruBlockCache(dCacheSize, blockSize);
+ _sCache = new LruBlockCache(sCacheSize, blockSize);
} else if (policy.equalsIgnoreCase("TinyLFU")) {
_iCache = new TinyLfuBlockCache(iCacheSize, blockSize);
_dCache = new TinyLfuBlockCache(dCacheSize, blockSize);
+ _sCache = new TinyLfuBlockCache(sCacheSize, blockSize);
} else {
throw new IllegalArgumentException("Unknown Block cache policy " + policy);
}
@@ -179,14 +191,14 @@ public class TabletServerResourceManager {
Runtime runtime = Runtime.getRuntime();
if (usingNativeMap) {
// Still check block cache sizes when using native maps.
- if (dCacheSize + iCacheSize + totalQueueSize > runtime.maxMemory()) {
+ if (dCacheSize + iCacheSize + sCacheSize + totalQueueSize > runtime.maxMemory()) {
throw new IllegalArgumentException(String.format("Block cache sizes %,d and mutation queue size %,d is too large for this JVM configuration %,d",
- dCacheSize + iCacheSize, totalQueueSize, runtime.maxMemory()));
+ dCacheSize + iCacheSize + sCacheSize, totalQueueSize, runtime.maxMemory()));
}
- } else if (maxMemory + dCacheSize + iCacheSize + totalQueueSize > runtime.maxMemory()) {
+ } else if (maxMemory + dCacheSize + iCacheSize + sCacheSize + totalQueueSize > runtime.maxMemory()) {
throw new IllegalArgumentException(String.format(
"Maximum tablet server map memory %,d block cache sizes %,d and mutation queue size %,d is too large for this JVM configuration %,d", maxMemory,
- dCacheSize + iCacheSize, totalQueueSize, runtime.maxMemory()));
+ dCacheSize + iCacheSize + sCacheSize, totalQueueSize, runtime.maxMemory()));
}
runtime.gc();
@@ -222,6 +234,10 @@ public class TabletServerResourceManager {
readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead");
defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets read ahead");
+ summaryRetrievalPool = createIdlingEs(Property.TSERV_SUMMARY_RETRIEVAL_THREADS, "summary file retriever", 60, TimeUnit.SECONDS);
+ summaryRemotePool = createIdlingEs(Property.TSERV_SUMMARY_REMOTE_THREADS, "summary remote", 60, TimeUnit.SECONDS);
+ summaryParitionPool = createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS, "summary partition", 60, TimeUnit.SECONDS);
+
int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
fileManager = new FileManager(tserver, fs, maxOpenFiles, _dCache, _iCache);
@@ -657,7 +673,7 @@ public class TabletServerResourceManager {
CompactionStrategy strategy = Property.createTableInstanceFromPropertyName(tableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class,
new DefaultCompactionStrategy());
strategy.init(Property.getCompactionStrategyOptions(tableConf));
- MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, TabletServerResourceManager.this.fs, tableConf);
+ MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, tableConf);
request.setFiles(tabletFiles);
try {
return strategy.shouldCompact(request);
@@ -760,4 +776,19 @@ public class TabletServerResourceManager {
return _dCache;
}
+ public BlockCache getSummaryCache() {
+ return _sCache;
+ }
+
+ public ExecutorService getSummaryRetrievalExecutor() {
+ return summaryRetrievalPool;
+ }
+
+ public ExecutorService getSummaryPartitionExecutor() {
+ return summaryParitionPool;
+ }
+
+ public ExecutorService getSummaryRemoteExecutor() {
+ return summaryRemotePool;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
index 0e0089a..85a4a13 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
@@ -32,7 +32,6 @@ import java.util.Map;
* compaction thread.
*/
public abstract class CompactionStrategy {
-
/**
* The settings for the compaction strategy pulled from zookeeper. The <tt>table.compacations.major.strategy.opts</tt> part of the setting will be removed.
*/