You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/03/20 14:49:01 UTC
[5/9] accumulo git commit: ACCUMULO-4501 ACCUMULO-96 Added
Summarization
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java
new file mode 100644
index 0000000..cc688c9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.data.thrift.TSummaries;
+import org.apache.accumulo.core.data.thrift.TSummarizerConfiguration;
+import org.apache.accumulo.core.data.thrift.TSummary;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class facilitates merging, storing, and serializing (to/from thrift) intermediate summary information.
+ */
+public class SummaryCollection {
+
+ private static class MergedSummary {
+ Map<String,Long> summary;
+ long filesContaining;
+ long filesExceedingBoundry;
+ long filesLarge;
+
+ public MergedSummary(FileSummary entry) {
+ this.summary = entry.summary;
+ this.filesContaining = 1;
+ this.filesExceedingBoundry = entry.exceededBoundry ? 1 : 0;
+ this.filesLarge = entry.exceededMaxSize ? 1 : 0;
+ }
+
+ public MergedSummary(TSummary tSummary) {
+ this.summary = new HashMap<>(tSummary.getSummary());
+ this.filesContaining = tSummary.getFilesContaining();
+ this.filesExceedingBoundry = tSummary.getFilesExceeding();
+ this.filesLarge = tSummary.getFilesLarge();
+ }
+
+ public void merge(MergedSummary other, SummarizerConfiguration config, SummarizerFactory factory) {
+
+ if (summary == null && other.summary != null) {
+ summary = new HashMap<>(other.summary);
+ } else if (summary != null && other.summary != null) {
+ Summarizer summarizer = factory.getSummarizer(config);
+ summarizer.combiner(config).merge(summary, other.summary);
+ }
+
+ filesContaining += other.filesContaining;
+ filesExceedingBoundry += other.filesExceedingBoundry;
+ filesLarge += other.filesLarge;
+ }
+
+ public TSummary toThrift(SummarizerConfiguration key) {
+ TSummarizerConfiguration tsumConf = SummarizerConfigurationUtil.toThrift(key);
+ return new TSummary(summary, tsumConf, filesContaining, filesExceedingBoundry, filesLarge);
+ }
+
+ }
+
+ private Map<SummarizerConfiguration,MergedSummary> mergedSummaries;
+ private long totalFiles;
+ private long deletedFiles;
+
+ public SummaryCollection() {
+ mergedSummaries = new HashMap<>();
+ totalFiles = 0;
+ }
+
+ public SummaryCollection(TSummaries tsums) {
+ mergedSummaries = new HashMap<>();
+ for (TSummary tSummary : tsums.getSummaries()) {
+ SummarizerConfiguration sconf = SummarizerConfigurationUtil.fromThrift(tSummary.getConfig());
+ mergedSummaries.put(sconf, new MergedSummary(tSummary));
+ }
+
+ totalFiles = tsums.getTotalFiles();
+ deletedFiles = tsums.getDeletedFiles();
+ }
+
+ SummaryCollection(Collection<FileSummary> initialEntries) {
+ this(initialEntries, false);
+ }
+
+ SummaryCollection(Collection<FileSummary> initialEntries, boolean deleted) {
+ if (deleted) {
+ Preconditions.checkArgument(initialEntries.size() == 0);
+ }
+ mergedSummaries = new HashMap<>();
+ for (FileSummary entry : initialEntries) {
+ mergedSummaries.put(entry.conf, new MergedSummary(entry));
+ }
+ totalFiles = 1;
+ this.deletedFiles = deleted ? 1 : 0;
+ }
+
+ static class FileSummary {
+
+ private SummarizerConfiguration conf;
+ private Map<String,Long> summary;
+ private boolean exceededBoundry;
+ private boolean exceededMaxSize;
+
+ FileSummary(SummarizerConfiguration conf, Map<String,Long> summary, boolean exceededBoundry) {
+ this.conf = conf;
+ this.summary = summary;
+ this.exceededBoundry = exceededBoundry;
+ this.exceededMaxSize = false;
+ }
+
+ FileSummary(SummarizerConfiguration conf) {
+ this.conf = conf;
+ this.summary = new HashMap<>();
+ ;
+ this.exceededBoundry = false;
+ this.exceededMaxSize = true;
+ }
+ }
+
+ public void merge(SummaryCollection other, SummarizerFactory factory) {
+ for (Entry<SummarizerConfiguration,MergedSummary> entry : other.mergedSummaries.entrySet()) {
+ MergedSummary ms = mergedSummaries.get(entry.getKey());
+ if (ms == null) {
+ mergedSummaries.put(entry.getKey(), entry.getValue());
+ } else {
+ ms.merge(entry.getValue(), entry.getKey(), factory);
+ }
+ }
+
+ this.totalFiles += other.totalFiles;
+ this.deletedFiles += other.deletedFiles;
+ }
+
+ public static SummaryCollection merge(SummaryCollection sc1, SummaryCollection sc2, SummarizerFactory factory) {
+ SummaryCollection ret = new SummaryCollection();
+ ret.merge(sc1, factory);
+ ret.merge(sc2, factory);
+ return ret;
+ }
+
+ public List<Summary> getSummaries() {
+ ArrayList<Summary> ret = new ArrayList<>(mergedSummaries.size());
+
+ for (Entry<SummarizerConfiguration,MergedSummary> entry : mergedSummaries.entrySet()) {
+ SummarizerConfiguration config = entry.getKey();
+ MergedSummary ms = entry.getValue();
+
+ ret.add(new Summary(ms.summary, config, totalFiles, (totalFiles - deletedFiles) - ms.filesContaining, ms.filesExceedingBoundry, ms.filesLarge,
+ deletedFiles));
+ }
+
+ return ret;
+ }
+
+ public long getTotalFiles() {
+ return totalFiles;
+ }
+
+ public TSummaries toThrift() {
+ List<TSummary> summaries = new ArrayList<>(mergedSummaries.size());
+ for (Entry<SummarizerConfiguration,MergedSummary> entry : mergedSummaries.entrySet()) {
+ summaries.add(entry.getValue().toThrift(entry.getKey()));
+ }
+
+ return new TSummaries(true, -1l, totalFiles, deletedFiles, summaries);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java
new file mode 100644
index 0000000..7b9ebe4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+
+class SummaryInfo {
+
+ final Map<String,Long> summary;
+ final Text lastRow;
+ final int count;
+
+ SummaryInfo(Text row, Map<String,Long> summary, int count) {
+ this.lastRow = row;
+ this.summary = summary;
+ this.count = count;
+ }
+
+ SummaryInfo(byte[] row, Map<String,Long> summary, int count) {
+ this.lastRow = new Text(row);
+ this.summary = summary;
+ this.count = count;
+ }
+
+ Text getLastRow() {
+ return lastRow;
+ }
+
+ Map<String,Long> getSummary() {
+ return summary;
+ }
+
+ int getCount() {
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
new file mode 100644
index 0000000..9b2b5d9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
+import org.apache.accumulo.core.summary.Gatherer.RowRange;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class SummaryReader {
+
+ private static interface BlockReader {
+ DataInputStream getMetaBlock(String name) throws IOException;
+ }
+
+ private static class CompositeCache implements BlockCache {
+
+ private BlockCache summaryCache;
+ private BlockCache indexCache;
+
+ CompositeCache(BlockCache summaryCache, BlockCache indexCache) {
+ this.summaryCache = summaryCache;
+ this.indexCache = indexCache;
+ }
+
+ @Override
+ public CacheEntry cacheBlock(String blockName, byte[] buf) {
+ return summaryCache.cacheBlock(blockName, buf);
+ }
+
+ @Override
+ public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
+ return summaryCache.cacheBlock(blockName, buf, inMemory);
+ }
+
+ @Override
+ public CacheEntry getBlock(String blockName) {
+ CacheEntry ce = summaryCache.getBlock(blockName);
+ if (ce == null) {
+ // Its possible the index cache may have this info, so check there. This is an opportunistic check.
+ ce = indexCache.getBlock(blockName);
+ }
+ return ce;
+ }
+
+ @Override
+ public long getMaxSize() {
+ return summaryCache.getMaxSize();
+ }
+
+ @Override
+ public Stats getStats() {
+ return summaryCache.getStats();
+ }
+ }
+
+ private static List<SummarySerializer> load(BlockReader bcReader, Predicate<SummarizerConfiguration> summarySelector) throws IOException {
+
+ try (DataInputStream in = bcReader.getMetaBlock(SummaryWriter.METASTORE_INDEX)) {
+ List<SummarySerializer> stores = new ArrayList<>();
+
+ readHeader(in);
+ int numSummaries = WritableUtils.readVInt(in);
+ for (int i = 0; i < numSummaries; i++) {
+ SummarizerConfiguration conf = readConfig(in);
+ boolean inline = in.readBoolean();
+ if (inline) {
+ if (summarySelector.test(conf)) {
+ stores.add(SummarySerializer.load(conf, in));
+ } else {
+ SummarySerializer.skip(in);
+ }
+ } else {
+ int block = WritableUtils.readVInt(in);
+ int offset = WritableUtils.readVInt(in);
+ if (summarySelector.test(conf)) {
+ try (DataInputStream summaryIn = bcReader.getMetaBlock(SummaryWriter.METASTORE_PREFIX + "." + block)) {
+ long skipped = in.skip(offset);
+ while (skipped < offset) {
+ skipped += in.skip(offset - skipped);
+ }
+ stores.add(SummarySerializer.load(conf, summaryIn));
+ } catch (MetaBlockDoesNotExist e) {
+ // this is unexpected
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ return stores;
+ } catch (MetaBlockDoesNotExist e) {
+ return Collections.emptyList();
+ }
+ }
+
+ private static SummaryReader load(CachableBlockFile.Reader bcReader, Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory)
+ throws IOException {
+ SummaryReader fileSummaries = new SummaryReader();
+ fileSummaries.summaryStores = load(name -> bcReader.getMetaBlock(name), summarySelector);
+ fileSummaries.factory = factory;
+ return fileSummaries;
+ }
+
+ public static SummaryReader load(Configuration conf, AccumuloConfiguration aConf, InputStream inputStream, long length,
+ Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory) throws IOException {
+ org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader = new CachableBlockFile.Reader((InputStream & Seekable) inputStream, length,
+ conf, aConf);
+ return load(bcReader, summarySelector, factory);
+ }
+
+ public static SummaryReader load(FileSystem fs, Configuration conf, AccumuloConfiguration aConf, SummarizerFactory factory, Path file,
+ Predicate<SummarizerConfiguration> summarySelector, BlockCache summaryCache, BlockCache indexCache) {
+ CachableBlockFile.Reader bcReader = null;
+
+ try {
+ // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when only summary data is wanted.
+ CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache);
+ bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf);
+ return load(bcReader, summarySelector, factory);
+ } catch (FileNotFoundException fne) {
+ SummaryReader sr = new SummaryReader();
+ sr.factory = factory;
+ sr.summaryStores = Collections.emptyList();
+ sr.deleted = true;
+ return sr;
+ } catch (IOException e) {
+ try {
+ if (!fs.exists(file)) {
+ SummaryReader sr = new SummaryReader();
+ sr.factory = factory;
+ sr.summaryStores = Collections.emptyList();
+ sr.deleted = true;
+ return sr;
+ }
+ } catch (IOException e1) {}
+ throw new UncheckedIOException(e);
+ } finally {
+ if (bcReader != null) {
+ try {
+ bcReader.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
+ }
+
+ private static void print(FileSKVIterator fsi, String indent, PrintStream out) throws IOException {
+
+ out.printf("Summary data : \n");
+
+ List<SummarySerializer> stores = load(name -> fsi.getMetaStore(name), conf -> true);
+ int i = 1;
+ for (SummarySerializer summaryStore : stores) {
+ out.printf("%sSummary %d of %d generated by : %s\n", indent, i, stores.size(), summaryStore.getSummarizerConfiguration());
+ i++;
+ summaryStore.print(indent, indent, out);
+ }
+ }
+
+ public static void print(Reader iter, PrintStream out) throws IOException {
+ print(iter, " ", out);
+ }
+
+ private static SummarizerConfiguration readConfig(DataInputStream in) throws IOException {
+ // read summarizer configuration
+ String summarizerClazz = in.readUTF();
+ String configId = in.readUTF();
+ org.apache.accumulo.core.client.summary.SummarizerConfiguration.Builder scb = SummarizerConfiguration.builder(summarizerClazz).setPropertyId(configId);
+ int numOpts = WritableUtils.readVInt(in);
+ for (int i = 0; i < numOpts; i++) {
+ String k = in.readUTF();
+ String v = in.readUTF();
+ scb.addOption(k, v);
+ }
+
+ return scb.build();
+ }
+
+ private static byte readHeader(DataInputStream in) throws IOException {
+ long magic = in.readLong();
+ if (magic != SummaryWriter.MAGIC) {
+ throw new IOException("Bad magic : " + String.format("%x", magic));
+ }
+
+ byte ver = in.readByte();
+ if (ver != SummaryWriter.VER) {
+ throw new IOException("Unknown version : " + ver);
+ }
+
+ return ver;
+ }
+
+ private List<SummarySerializer> summaryStores;
+
+ private SummarizerFactory factory;
+
+ private boolean deleted;
+
+ public SummaryCollection getSummaries(List<RowRange> ranges) {
+
+ List<SummaryCollection.FileSummary> initial = new ArrayList<>();
+ if (deleted) {
+ return new SummaryCollection(initial, true);
+ }
+ for (SummarySerializer summaryStore : summaryStores) {
+ if (summaryStore.exceededMaxSize()) {
+ initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration()));
+ } else {
+ Map<String,Long> summary = summaryStore.getSummary(ranges, factory);
+ boolean exceeded = summaryStore.exceedsRange(ranges);
+ initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration(), summary, exceeded));
+ }
+ }
+
+ return new SummaryCollection(initial);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java b/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java
new file mode 100644
index 0000000..d76bd1a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.Summarizer.Collector;
+import org.apache.accumulo.core.client.summary.Summarizer.Combiner;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.summary.Gatherer.RowRange;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * This class supports serializing summaries and periodically storing summaries. The implementations attempts to generate around 10 summaries that are evenly
+ * spaced. This allows asking for summaries for sub-ranges of data in a rfile.
+ *
+ * <p>
+ * At first summaries are created for every 1000 keys values. After 10 summaries are added, the 10 summaries are merged to 5 and summaries are then created for
+ * every 2000 key values. The code keeps merging summaries and doubling the amount of key values per summary. This results in each summary covering about the
+ * same number of key values.
+ *
+ */
+
+class SummarySerializer {
+
+ private SummarizerConfiguration sconf;
+ private LgSummaries[] allSummaries;
+
+ private SummarySerializer(SummarizerConfiguration sconf, LgSummaries[] allSummaries) {
+ this.sconf = sconf;
+ this.allSummaries = allSummaries;
+ }
+
+ private SummarySerializer(SummarizerConfiguration sconf) {
+ this.sconf = sconf;
+ // this indicates max size was exceeded
+ this.allSummaries = null;
+ }
+
+ public SummarizerConfiguration getSummarizerConfiguration() {
+ return sconf;
+ }
+
+ public void print(String prefix, String indent, PrintStream out) {
+
+ if (allSummaries == null) {
+ out.printf("%sSummary not stored because it was too large\n", prefix + indent);
+ } else {
+ for (LgSummaries lgs : allSummaries) {
+ lgs.print(prefix, indent, out);
+ }
+ }
+ }
+
+ public Map<String,Long> getSummary(List<RowRange> ranges, SummarizerFactory sf) {
+
+ Summarizer kvs = sf.getSummarizer(sconf);
+
+ Map<String,Long> summary = new HashMap<>();
+ for (LgSummaries lgs : allSummaries) {
+ lgs.getSummary(ranges, kvs.combiner(sconf), summary);
+ }
+ return summary;
+ }
+
+ public boolean exceedsRange(List<RowRange> ranges) {
+ boolean er = false;
+ for (LgSummaries lgs : allSummaries) {
+ for (RowRange ke : ranges) {
+ er |= lgs.exceedsRange(ke.getStartRow(), ke.getEndRow());
+ if (er) {
+ return er;
+ }
+ }
+ }
+
+ return er;
+ }
+
+ public boolean exceededMaxSize() {
+ return allSummaries == null;
+ }
+
+ private static class SummaryStoreImpl implements org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer {
+
+ HashMap<String,Long> summaries;
+
+ @Override
+ public void accept(String summary, long value) {
+ summaries.put(summary, value);
+ }
+ }
+
+ private static class LgBuilder {
+ private Summarizer summarizer;
+ private SummarizerConfiguration conf;
+ private Collector collector;
+
+ private int maxSummaries = 10;
+
+ private int cutoff = 1000;
+ private int count = 0;
+
+ private List<SummaryInfo> summaries = new ArrayList<>();
+
+ private Key lastKey;
+
+ private SummaryStoreImpl sci = new SummaryStoreImpl();
+
+ private String name;
+
+ private boolean sawFirst = false;
+ private Text firstRow;
+
+ private boolean finished = false;
+
+ public LgBuilder(SummarizerConfiguration conf, Summarizer kvs) {
+ this.conf = conf;
+ this.summarizer = kvs;
+ this.name = "<DEFAULT>";
+ this.collector = kvs.collector(conf);
+ }
+
+ public LgBuilder(SummarizerConfiguration conf, Summarizer kvs, String name) {
+ this.conf = conf;
+ this.summarizer = kvs;
+ this.name = name;
+ this.collector = kvs.collector(conf);
+ }
+
+ public void put(Key k, Value v) {
+ collector.accept(k, v);
+ count++;
+
+ if (!sawFirst) {
+ firstRow = k.getRow();
+ sawFirst = true;
+
+ }
+
+ if (count >= cutoff) {
+ sci.summaries = new HashMap<>();
+ collector.summarize(sci);
+ collector = summarizer.collector(conf);
+ addSummary(k.getRow(), sci.summaries, count);
+ count = 0;
+ }
+
+ lastKey = k;
+ }
+
+ private List<SummaryInfo> merge(int end) {
+ List<SummaryInfo> mergedSummaries = new ArrayList<>();
+ for (int i = 0; i < end; i += 2) {
+ int mergedCount = summaries.get(i).count + summaries.get(i + 1).count;
+ summarizer.combiner(conf).merge(summaries.get(i).summary, summaries.get(i + 1).summary);
+ mergedSummaries.add(new SummaryInfo(summaries.get(i + 1).getLastRow(), summaries.get(i).summary, mergedCount));
+ }
+ return mergedSummaries;
+ }
+
+ private void addSummary(Text row, Map<String,Long> summary, int count) {
+ Preconditions.checkState(!finished);
+ summaries.add(new SummaryInfo(row, summary, count));
+
+ if (summaries.size() % 2 == 0 && summaries.size() > maxSummaries) {
+ summaries = merge(summaries.size());
+ cutoff *= 2;
+ }
+ }
+
+ boolean collapse() {
+ Preconditions.checkState(finished);
+ if (summaries.size() <= 1) {
+ return false;
+ }
+
+ int end = summaries.size();
+ if (end % 2 == 1) {
+ end--;
+ }
+
+ List<SummaryInfo> mergedSummaries = merge(end);
+
+ if (summaries.size() % 2 == 1) {
+ mergedSummaries.add(summaries.get(summaries.size() - 1));
+ }
+
+ summaries = mergedSummaries;
+
+ return true;
+ }
+
+ void finish() {
+ Preconditions.checkState(!finished);
+ // summarize last data
+ if (count > 0) {
+ sci.summaries = new HashMap<>();
+ collector.summarize(sci);
+ collector = null;
+ addSummary(lastKey.getRow(), sci.summaries, count);
+ count = 0;
+ finished = true;
+ }
+ }
+
+ public void save(DataOutputStream dos, HashMap<String,Integer> symbolTable) throws IOException {
+ Preconditions.checkState(count == 0);
+
+ dos.writeUTF(name);
+
+ if (firstRow == null) {
+ WritableUtils.writeVInt(dos, 0);
+ } else {
+ firstRow.write(dos);
+ }
+
+ // write summaries
+ WritableUtils.writeVInt(dos, summaries.size());
+ for (SummaryInfo summaryInfo : summaries) {
+ summaryInfo.getLastRow().write(dos);
+ WritableUtils.writeVInt(dos, summaryInfo.count);
+ saveSummary(dos, symbolTable, summaryInfo.summary);
+ }
+ }
+
+ private void saveSummary(DataOutputStream dos, HashMap<String,Integer> symbolTable, Map<String,Long> summary) throws IOException {
+ WritableUtils.writeVInt(dos, summary.size());
+ for (Entry<String,Long> e : summary.entrySet()) {
+ WritableUtils.writeVInt(dos, symbolTable.get(e.getKey()));
+ WritableUtils.writeVLong(dos, e.getValue());
+ }
+ }
+ }
+
+ public static class Builder {
+ private Summarizer kvs;
+
+ private SummarizerConfiguration conf;
+
+ private List<LgBuilder> locGroups;
+ private LgBuilder lgb;
+
+ private long maxSize;
+
+ public Builder(SummarizerConfiguration conf, Summarizer kvs, long maxSize) {
+ this.conf = conf;
+ this.kvs = kvs;
+ this.locGroups = new ArrayList<>();
+ this.maxSize = maxSize;
+ }
+
+ public void put(Key k, Value v) {
+ lgb.put(k, v);
+ }
+
+ public SummarizerConfiguration getSummarizerConfiguration() {
+ return conf;
+ }
+
+ public void save(DataOutputStream dos) throws IOException {
+
+ if (lgb != null) {
+ lgb.finish();
+ locGroups.add(lgb);
+ }
+
+ byte[] data = _save();
+
+ while (data.length > maxSize) {
+ boolean collapsedSome = false;
+ for (LgBuilder lgBuilder : locGroups) {
+ collapsedSome |= lgBuilder.collapse();
+ }
+
+ if (collapsedSome) {
+ data = _save();
+ } else {
+ break;
+ }
+ }
+
+ if (data.length > maxSize) {
+ dos.writeBoolean(true);
+ } else {
+ dos.writeBoolean(false);
+ // write this out to support efficient skipping
+ WritableUtils.writeVInt(dos, data.length);
+ dos.write(data);
+ }
+ }
+
+ private byte[] _save() throws IOException {
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) {
+ // create a symbol table
+ HashMap<String,Integer> symbolTable = new HashMap<>();
+ ArrayList<String> symbols = new ArrayList<>();
+ for (LgBuilder lg : locGroups) {
+ for (SummaryInfo si : lg.summaries) {
+ for (String symbol : si.summary.keySet()) {
+ if (!symbolTable.containsKey(symbol)) {
+ symbolTable.put(symbol, symbols.size());
+ symbols.add(symbol);
+ }
+ }
+ }
+ }
+
+ // write symbol table
+ WritableUtils.writeVInt(dos, symbols.size());
+ for (String symbol : symbols) {
+ dos.writeUTF(symbol);
+ }
+
+ WritableUtils.writeVInt(dos, locGroups.size());
+ for (LgBuilder lg : locGroups) {
+ lg.save(dos, symbolTable);
+ }
+
+ dos.close();
+ return baos.toByteArray();
+ }
+ }
+
+ public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) {
+ if (lgb != null) {
+ lgb.finish();
+ locGroups.add(lgb);
+ }
+
+ lgb = new LgBuilder(conf, kvs, name);
+ }
+
+ public void startDefaultLocalityGroup() {
+ if (lgb != null) {
+ lgb.finish();
+ locGroups.add(lgb);
+ }
+
+ lgb = new LgBuilder(conf, kvs);
+ }
+ }
+
+ public static Builder builder(SummarizerConfiguration conf, SummarizerFactory factory, long maxSize) {
+ return new Builder(conf, factory.getSummarizer(conf), maxSize);
+ }
+
+ static void skip(DataInputStream in) throws IOException {
+ boolean exceededMaxSize = in.readBoolean();
+ if (!exceededMaxSize) {
+ long len = WritableUtils.readVInt(in);
+ long skipped = in.skip(len);
+ while (skipped < len) {
+ skipped += in.skip(len - skipped);
+ }
+ }
+ }
+
+ static SummarySerializer load(SummarizerConfiguration sconf, DataInputStream in) throws IOException {
+ boolean exceededMaxSize = in.readBoolean();
+ if (!exceededMaxSize) {
+ WritableUtils.readVInt(in);
+ // load symbol table
+ int numSymbols = WritableUtils.readVInt(in);
+ String[] symbols = new String[numSymbols];
+ for (int i = 0; i < numSymbols; i++) {
+ symbols[i] = in.readUTF();
+ }
+
+ int numLGroups = WritableUtils.readVInt(in);
+ LgSummaries[] allSummaries = new LgSummaries[numLGroups];
+ for (int i = 0; i < numLGroups; i++) {
+ allSummaries[i] = readLGroup(in, symbols);
+ }
+
+ return new SummarySerializer(sconf, allSummaries);
+ } else {
+ return new SummarySerializer(sconf);
+ }
+ }
+
+ private static class LgSummaries {
+
+ private Text firstRow;
+ private SummaryInfo[] summaries;
+ private String lgroupName;
+
+ LgSummaries(Text firstRow, SummaryInfo[] summaries, String lgroupName) {
+ this.firstRow = firstRow;
+ this.summaries = summaries;
+ this.lgroupName = lgroupName;
+ }
+
+ boolean exceedsRange(Text startRow, Text endRow) {
+
+ Text lastRow = summaries[summaries.length - 1].lastRow;
+ if (startRow != null && firstRow.compareTo(startRow) <= 0 && startRow.compareTo(lastRow) < 0) {
+ return true;
+ }
+
+ if (endRow != null && endRow.compareTo(firstRow) >= 0 && lastRow.compareTo(endRow) > 0) {
+ return true;
+ }
+
+ return false;
+ }
+
+ void print(String prefix, String indent, PrintStream out) {
+ String p = prefix + indent;
+ out.printf("%sLocality group : %s\n", p, lgroupName);
+ p += indent;
+ for (SummaryInfo si : summaries) {
+ out.printf("%sSummary of %d key values (row of last key '%s') : \n", p, si.count, si.lastRow);
+ Set<Entry<String,Long>> es = si.summary.entrySet();
+ String p2 = p + indent;
+ for (Entry<String,Long> entry : es) {
+ out.printf("%s%s = %s\n", p2, entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ void getSummary(List<RowRange> ranges, Combiner combiner, Map<String,Long> summary) {
+ boolean[] summariesThatOverlap = new boolean[summaries.length];
+
+ for (RowRange keyExtent : ranges) {
+ Text startRow = keyExtent.getStartRow();
+ Text endRow = keyExtent.getEndRow();
+
+ if (endRow != null && endRow.compareTo(firstRow) < 0) {
+ continue;
+ }
+
+ int start = -1;
+ int end = summaries.length - 1;
+
+ if (startRow == null) {
+ start = 0;
+ } else {
+ for (int i = 0; i < summaries.length; i++) {
+ if (startRow.compareTo(summaries[i].getLastRow()) < 0) {
+ start = i;
+ break;
+ }
+ }
+ }
+
+ if (start == -1) {
+ continue;
+ }
+
+ if (endRow == null) {
+ end = summaries.length - 1;
+ } else {
+ for (int i = start; i < summaries.length; i++) {
+ if (endRow.compareTo(summaries[i].getLastRow()) < 0) {
+ end = i;
+ break;
+ }
+ }
+ }
+
+ for (int i = start; i <= end; i++) {
+ summariesThatOverlap[i] = true;
+ }
+ }
+
+ for (int i = 0; i < summaries.length; i++) {
+ if (summariesThatOverlap[i]) {
+ combiner.merge(summary, summaries[i].summary);
+ }
+ }
+ }
+ }
+
+ private static LgSummaries readLGroup(DataInputStream in, String[] symbols) throws IOException {
+ String lgroupName = in.readUTF();
+
+ // read first row
+ Text firstRow = new Text();
+ firstRow.readFields(in);
+
+ // read summaries
+ int numSummaries = WritableUtils.readVInt(in);
+ SummaryInfo[] summaries = new SummaryInfo[numSummaries];
+ for (int i = 0; i < numSummaries; i++) {
+ int rowLen = WritableUtils.readVInt(in);
+ byte[] row = new byte[rowLen];
+ in.readFully(row);
+ int count = WritableUtils.readVInt(in);
+ Map<String,Long> summary = readSummary(in, symbols);
+ summaries[i] = new SummaryInfo(row, summary, count);
+ }
+
+ return new LgSummaries(firstRow, summaries, lgroupName);
+ }
+
+ private static Map<String,Long> readSummary(DataInputStream in, String[] symbols) throws IOException {
+ com.google.common.collect.ImmutableMap.Builder<String,Long> imb = ImmutableMap.builder();
+ int numEntries = WritableUtils.readVInt(in);
+
+ for (int i = 0; i < numEntries; i++) {
+ String symbol = symbols[WritableUtils.readVInt(in)];
+ imb.put(symbol, WritableUtils.readVLong(in));
+ }
+
+ return imb.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java
new file mode 100644
index 0000000..1ebeeae
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.hadoop.io.WritableUtils;
+
+public class SummaryWriter implements FileSKVWriter {
+
+ static final String METASTORE_PREFIX = "accumulo.summaries";
+ static final String METASTORE_INDEX = "accumulo.summaries.index";
+
+ // echo "accumulo summarize" | sha1sum | head -c 8
+ static long MAGIC = 0x15ea283ec03e4c49L;
+ static byte VER = 1;
+
+ private FileSKVWriter writer;
+ private SummarySerializer.Builder[] summaryStores;
+
+ private SummaryWriter(FileSKVWriter writer, SummarizerFactory factory, List<SummarizerConfiguration> configs, long maxSize) {
+ this.writer = writer;
+ int i = 0;
+ summaryStores = new SummarySerializer.Builder[configs.size()];
+ for (SummarizerConfiguration sconf : configs) {
+ summaryStores[i++] = SummarySerializer.builder(sconf, factory, maxSize);
+ }
+ }
+
+ @Override
+ public boolean supportsLocalityGroups() {
+ return true;
+ }
+
+ @Override
+ public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
+ for (SummarySerializer.Builder ssb : summaryStores) {
+ ssb.startNewLocalityGroup(name, columnFamilies);
+ }
+
+ writer.startNewLocalityGroup(name, columnFamilies);
+ }
+
+ @Override
+ public void startDefaultLocalityGroup() throws IOException {
+ for (SummarySerializer.Builder ssb : summaryStores) {
+ ssb.startDefaultLocalityGroup();
+ }
+ writer.startDefaultLocalityGroup();
+ }
+
+ @Override
+ public void append(Key key, Value value) throws IOException {
+ writer.append(key, value);
+ for (SummarySerializer.Builder ssb : summaryStores) {
+ ssb.put(key, value);
+ }
+ }
+
+ @Override
+ public DataOutputStream createMetaStore(String name) throws IOException {
+ return writer.createMetaStore(name);
+ }
+
+ public void writeConfig(SummarizerConfiguration conf, DataOutputStream dos) throws IOException {
+ // save class (and its config) used to generate summaries
+ dos.writeUTF(conf.getClassName());
+ dos.writeUTF(conf.getPropertyId());
+ WritableUtils.writeVInt(dos, conf.getOptions().size());
+ for (Entry<String,String> entry : conf.getOptions().entrySet()) {
+ dos.writeUTF(entry.getKey());
+ dos.writeUTF(entry.getValue());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ DataOutputStream out = writer.createMetaStore(METASTORE_INDEX);
+ out.writeLong(MAGIC);
+ out.write(VER);
+ WritableUtils.writeVInt(out, summaryStores.length);
+
+ // Could possibly inline small summaries in the future. Breaking summaries into multiple block is better for caching a subset of summaries. Also, keeping
+ // the index small is good for the case where summaries that do not exist are requested. However multiple blocks cause more random I/O in the case when its
+ // not yet in the cache.
+
+ for (int i = 0; i < summaryStores.length; i++) {
+ writeConfig(summaryStores[i].getSummarizerConfiguration(), out);
+ // write if summary is inlined in index... support for possible future optimizations.
+ out.writeBoolean(false);
+ // write pointer to block that will contain summary data
+ WritableUtils.writeVInt(out, i);
+ // write offset of summary data within block. This is not currently used, but it supports storing multiple summaries in an external block in the
+ // future without changing the code.
+ WritableUtils.writeVInt(out, 0);
+ }
+ out.close();
+
+ for (int i = 0; i < summaryStores.length; i++) {
+ DataOutputStream summaryOut = writer.createMetaStore(METASTORE_PREFIX + "." + i);
+ summaryStores[i].save(summaryOut);
+ summaryOut.close();
+ }
+
+ writer.close();
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return writer.getLength();
+ }
+
+ public static FileSKVWriter wrap(FileSKVWriter writer, AccumuloConfiguration tableConfig, boolean useAccumuloStart) {
+ List<SummarizerConfiguration> configs = SummarizerConfigurationUtil.getSummarizerConfigs(tableConfig);
+
+ if (configs.size() == 0) {
+ return writer;
+ }
+
+ SummarizerFactory factory;
+ if (useAccumuloStart) {
+ factory = new SummarizerFactory(tableConfig);
+ } else {
+ factory = new SummarizerFactory();
+ }
+
+ long maxSize = tableConfig.getMemoryInBytes(Property.TABLE_FILE_SUMMARY_MAX_SIZE);
+ return new SummaryWriter(writer, factory, configs, maxSize);
+ }
+}