You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2015/09/21 15:51:31 UTC
[6/7] accumulo git commit: ACCUMULO-3913 Added per table sampling
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/compaction/NullType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/NullType.java b/core/src/main/java/org/apache/accumulo/core/compaction/NullType.java
new file mode 100644
index 0000000..fb4c452
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/NullType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.compaction;
+
+import com.google.common.base.Preconditions;
+
+public class NullType implements Type {
+ @Override
+ public String convert(String str) {
+ Preconditions.checkArgument(str == null);
+ return "";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 5bd5c8a..400577c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -514,6 +514,16 @@ public enum Property {
@Experimental
TABLE_VOLUME_CHOOSER("table.volume.chooser", "org.apache.accumulo.server.fs.RandomVolumeChooser", PropertyType.CLASSNAME,
"The class that will be used to select which volume will be used to create new files for this table."),
+ TABLE_SAMPLER(
+ "table.sampler",
+ "",
+ PropertyType.CLASSNAME,
+ "The name of a class that implements org.apache.accumulo.core.Sampler. Setting this option enables storing a sample of data which can be scanned."
+ + " Always having a current sample can useful for query optimization and data comprehension. After enabling sampling for an existing table, a compaction "
+ + "is needed to compute the sample for existing data. The compact command in the shell has an option to only compact files without sample data."),
+ TABLE_SAMPLER_OPTS("table.sampler.opt.", null, PropertyType.PREFIX,
+ "The property is used to set options for a sampler. If a sample had two options like hasher and modulous, then the two properties "
+ + "table.sampler.opt.hasher=${hash algorithm} and table.sampler.opt.modulous=${mod} would be set."),
// VFS ClassLoader properties
VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY(AccumuloVFSClassLoader.VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY, "", PropertyType.STRING,
@@ -776,7 +786,7 @@ public enum Property {
return validTableProperties.contains(key) || key.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())
|| key.startsWith(Property.TABLE_ITERATOR_PREFIX.getKey()) || key.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey())
|| key.startsWith(Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey()) || key.startsWith(Property.TABLE_REPLICATION_TARGET.getKey())
- || key.startsWith(Property.TABLE_ARBITRARY_PROP_PREFIX.getKey());
+ || key.startsWith(Property.TABLE_ARBITRARY_PROP_PREFIX.getKey()) || key.startsWith(TABLE_SAMPLER_OPTS.getKey());
}
private static final EnumSet<Property> fixedProperties = EnumSet.of(Property.TSERV_CLIENTPORT, Property.TSERV_NATIVEMAP_ENABLED,
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index a5bea83..758df12 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.file.keyfunctor.KeyFunctor;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.fate.util.LoggingRunnable;
@@ -424,6 +425,11 @@ public class BloomFilterLayer {
reader.setInterruptFlag(flag);
}
+ @Override
+ public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+ return new BloomFilterLayer.Reader(reader.getSample(sampleConfig), bfl);
+ }
+
}
public static void main(String[] args) throws IOException {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java b/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
index 60970e2..3713453 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
@@ -21,14 +21,17 @@ import java.io.IOException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
-public interface FileSKVIterator extends InterruptibleIterator {
+public interface FileSKVIterator extends InterruptibleIterator, AutoCloseable {
Key getFirstKey() throws IOException;
Key getLastKey() throws IOException;
DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException;
+ FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig);
+
void closeDeepCopies() throws IOException;
void close() throws IOException;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
index fb2762f..75cfa7e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.MapFileIterator;
import org.apache.accumulo.core.iterators.system.SequenceFileIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -132,6 +133,11 @@ public class MapFileOperations extends FileOperations {
public void setInterruptFlag(AtomicBoolean flag) {
((FileSKVIterator) reader).setInterruptFlag(flag);
}
+
+ @Override
+ public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+ return ((FileSKVIterator) reader).getSample(sampleConfig);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
index f220a58..01af184 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.HeapIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
class MultiIndexIterator extends HeapIterator implements FileSKVIterator {
@@ -93,4 +94,9 @@ class MultiIndexIterator extends HeapIterator implements FileSKVIterator {
throw new UnsupportedOperationException();
}
+ @Override
+ public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+ throw new UnsupportedOperationException();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index 2109478..1a383e4 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -265,7 +265,7 @@ public class MultiLevelIndex {
public void readFields(DataInput in, int version) throws IOException {
- if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
+ if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7 || version == RFile.RINDEX_VER_8) {
level = in.readInt();
offset = in.readInt();
hasNext = in.readBoolean();
@@ -736,7 +736,7 @@ public class MultiLevelIndex {
size = 0;
- if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
+ if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7 || version == RFile.RINDEX_VER_8) {
size = in.readInt();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
index 5a3e911..4631a4d 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
import org.apache.accumulo.start.spi.KeywordExecutable;
@@ -54,6 +55,8 @@ public class PrintInfo implements KeywordExecutable {
boolean hash = false;
@Parameter(names = {"--histogram"}, description = "print a histogram of the key-value sizes")
boolean histogram = false;
+ @Parameter(names = {"--useSample"}, description = "Use sample data for --dump, --vis, --histogram options")
+ boolean useSample = false;
@Parameter(description = " <file> { <file> ... }")
List<String> files = new ArrayList<String>();
@Parameter(names = {"-c", "--config"}, variableArity = true, description = "Comma-separated Hadoop configuration files")
@@ -119,14 +122,27 @@ public class PrintInfo implements KeywordExecutable {
if (opts.histogram || opts.dump || opts.vis || opts.hash) {
localityGroupCF = iter.getLocalityGroupCF();
+ FileSKVIterator dataIter = iter;
+ if (opts.useSample) {
+ dataIter = iter.getSample();
+
+ if (dataIter == null) {
+ System.out.println("ERROR : This rfile has no sample data");
+ return;
+ }
+ }
+
for (Entry<String,ArrayList<ByteSequence>> cf : localityGroupCF.entrySet()) {
- iter.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
- while (iter.hasTop()) {
- Key key = iter.getTopKey();
- Value value = iter.getTopValue();
- if (opts.dump)
+ dataIter.seek(new Range((Key) null, (Key) null), cf.getValue(), true);
+ while (dataIter.hasTop()) {
+ Key key = dataIter.getTopKey();
+ Value value = dataIter.getTopValue();
+ if (opts.dump) {
System.out.println(key + " -> " + value);
+ if (System.out.checkError())
+ return;
+ }
if (opts.histogram) {
long size = key.getSize() + value.getSize();
int bucket = (int) Math.log10(size);
@@ -134,7 +150,7 @@ public class PrintInfo implements KeywordExecutable {
sizeBuckets[bucket] += size;
totalSize += size;
}
- iter.next();
+ dataIter.next();
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 54b01b4..9564f0b 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -36,6 +36,8 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ArrayByteSequence;
@@ -62,12 +64,17 @@ import org.apache.accumulo.core.iterators.system.HeapIterator;
import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
+import org.apache.accumulo.core.sample.Sampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.util.MutableByteSequence;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
public class RFile {
public static final String EXTENSION = "rf";
@@ -77,15 +84,38 @@ public class RFile {
private RFile() {}
private static final int RINDEX_MAGIC = 0x20637474;
- static final int RINDEX_VER_7 = 7;
- static final int RINDEX_VER_6 = 6;
+
+ static final int RINDEX_VER_8 = 8; // Added sample storage. There is a sample locality group for each locality group. Sample are built using a Sampler and
+ // sampler configuration. The Sampler and its configuration are stored in RFile. Persisting the method of producing the
+ // sample allows a user of RFile to determine if the sample is useful.
+ static final int RINDEX_VER_7 = 7; // Added support for prefix encoding and encryption. Before this change only exact matches within a key field were deduped
+ // for consecutive keys. After this change, if consecutive key fields have the same prefix then the prefix is only stored
+ // once.
+ static final int RINDEX_VER_6 = 6; // Added support for multilevel indexes. Before this the index was one list with an entry for each data block. For large
+ // files, a large index needed to be read into memory before any seek could be done. After this change the index is a fat
+ // tree, and opening a large rfile is much faster. Like the previous version of Rfile, each index node in the tree is kept
+ // in memory serialized and used in its serialized form.
// static final int RINDEX_VER_5 = 5; // unreleased
- static final int RINDEX_VER_4 = 4;
- static final int RINDEX_VER_3 = 3;
+ static final int RINDEX_VER_4 = 4; // Added support for seeking using serialized indexes. After this change index is no longer deserialized when rfile opened.
+ // Entire serialized index is read into memory as single byte array. For seeks, serialized index is used to find blocks
+ // (the binary search deserializes the specific entries its needs). This resulted in less memory usage (no object overhead)
+ // and faster open times for RFiles.
+ static final int RINDEX_VER_3 = 3; // Initial released version of RFile. R is for relative encoding. A keys is encoded relative to the previous key. The
+ // initial version deduped key fields that were the same for consecutive keys. For sorted data this is a common occurrence.
+ // This version supports locality groups. Each locality group has an index pointing to set of data blocks. Each data block
+ // contains relatively encoded keys and values.
+
+ // Buffer sample data so that many sample data blocks are stored contiguously.
+ private static int sampleBufferSize = 10000000;
+
+ @VisibleForTesting
+ public static void setSampleBufferSize(int bufferSize) {
+ sampleBufferSize = bufferSize;
+ }
private static class LocalityGroupMetadata implements Writable {
- private int startBlock;
+ private int startBlock = -1;
private Key firstKey;
private Map<ByteSequence,MutableLong> columnFamilies;
@@ -95,14 +125,15 @@ public class RFile {
private MultiLevelIndex.BufferedWriter indexWriter;
private MultiLevelIndex.Reader indexReader;
+ private int version;
public LocalityGroupMetadata(int version, BlockFileReader br) {
columnFamilies = new HashMap<ByteSequence,MutableLong>();
indexReader = new MultiLevelIndex.Reader(br, version);
+ this.version = version;
}
- public LocalityGroupMetadata(int nextBlock, Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) {
- this.startBlock = nextBlock;
+ public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) {
isDefaultLG = true;
columnFamilies = new HashMap<ByteSequence,MutableLong>();
previousColumnFamilies = pcf;
@@ -110,8 +141,7 @@ public class RFile {
indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
}
- public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int nextBlock, int indexBlockSize, BlockFileWriter bfw) {
- this.startBlock = nextBlock;
+ public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int indexBlockSize, BlockFileWriter bfw) {
this.name = name;
isDefaultLG = false;
columnFamilies = new HashMap<ByteSequence,MutableLong>();
@@ -181,7 +211,9 @@ public class RFile {
name = in.readUTF();
}
- startBlock = in.readInt();
+ if (version == RINDEX_VER_3 || version == RINDEX_VER_4 || version == RINDEX_VER_6 || version == RINDEX_VER_7) {
+ startBlock = in.readInt();
+ }
int size = in.readInt();
@@ -224,8 +256,6 @@ public class RFile {
out.writeUTF(name);
}
- out.writeInt(startBlock);
-
if (isDefaultLG && columnFamilies == null) {
// only expect null when default LG, otherwise let a NPE occur
out.writeInt(-1);
@@ -246,26 +276,27 @@ public class RFile {
indexWriter.close(out);
}
- public void printInfo() throws IOException {
+ public void printInfo(boolean isSample) throws IOException {
PrintStream out = System.out;
- out.println("Locality group : " + (isDefaultLG ? "<DEFAULT>" : name));
- out.println("\tStart block : " + startBlock);
- out.println("\tNum blocks : " + String.format("%,d", indexReader.size()));
+ out.printf("%-24s : %s\n", (isSample ? "Sample " : "") + "Locality group ", (isDefaultLG ? "<DEFAULT>" : name));
+ if (version == RINDEX_VER_3 || version == RINDEX_VER_4 || version == RINDEX_VER_6 || version == RINDEX_VER_7) {
+ out.printf("\t%-22s : %d\n", "Start block", startBlock);
+ }
+ out.printf("\t%-22s : %,d\n", "Num blocks", indexReader.size());
TreeMap<Integer,Long> sizesByLevel = new TreeMap<Integer,Long>();
TreeMap<Integer,Long> countsByLevel = new TreeMap<Integer,Long>();
indexReader.getIndexInfo(sizesByLevel, countsByLevel);
for (Entry<Integer,Long> entry : sizesByLevel.descendingMap().entrySet()) {
- out.println("\tIndex level " + entry.getKey() + " : "
- + String.format("%,d bytes %,d blocks", entry.getValue(), countsByLevel.get(entry.getKey())));
+ out.printf("\t%-22s : %,d bytes %,d blocks\n", "Index level " + entry.getKey(), entry.getValue(), countsByLevel.get(entry.getKey()));
}
- out.println("\tFirst key : " + firstKey);
+ out.printf("\t%-22s : %s\n", "First key", firstKey);
Key lastKey = null;
if (indexReader.size() > 0) {
lastKey = indexReader.getLastKey();
}
- out.println("\tLast key : " + lastKey);
+ out.printf("\t%-22s : %s\n", "Last key", lastKey);
long numKeys = 0;
IndexIterator countIter = indexReader.lookup(new Key());
@@ -273,48 +304,193 @@ public class RFile {
numKeys += countIter.next().getNumEntries();
}
- out.println("\tNum entries : " + String.format("%,d", numKeys));
- out.println("\tColumn families : " + (isDefaultLG && columnFamilies == null ? "<UNKNOWN>" : columnFamilies.keySet()));
+ out.printf("\t%-22s : %,d\n", "Num entries", numKeys);
+ out.printf("\t%-22s : %s\n", "Column families", (isDefaultLG && columnFamilies == null ? "<UNKNOWN>" : columnFamilies.keySet()));
}
}
- public static class Writer implements FileSKVWriter {
+ private static class SampleEntry {
+ Key key;
+ Value val;
- public static final int MAX_CF_IN_DLG = 1000;
+ SampleEntry(Key key, Value val) {
+ this.key = new Key(key);
+ this.val = new Value(val);
+ }
+ }
+
+ private static class SampleLocalityGroupWriter {
+
+ private Sampler sampler;
+
+ private List<SampleEntry> entries = new ArrayList<>();
+ private long dataSize = 0;
+
+ private LocalityGroupWriter lgr;
+
+ public SampleLocalityGroupWriter(LocalityGroupWriter lgr, Sampler sampler) {
+ this.lgr = lgr;
+ this.sampler = sampler;
+ }
+
+ public void append(Key key, Value value) throws IOException {
+ if (sampler.accept(key)) {
+ entries.add(new SampleEntry(key, value));
+ dataSize += key.getSize() + value.getSize();
+ }
+ }
+
+ public void close() throws IOException {
+ for (SampleEntry se : entries) {
+ lgr.append(se.key, se.val);
+ }
+
+ lgr.close();
+ }
+
+ public void flushIfNeeded() throws IOException {
+ if (dataSize > sampleBufferSize) {
+ // the reason to write out all but one key is so that closeBlock() can always eventually be called with true
+ List<SampleEntry> subList = entries.subList(0, entries.size() - 1);
+
+ if (subList.size() > 0) {
+ for (SampleEntry se : subList) {
+ lgr.append(se.key, se.val);
+ }
+
+ lgr.closeBlock(subList.get(subList.size() - 1).key, false);
+
+ subList.clear();
+ dataSize = 0;
+ }
+ }
+ }
+ }
+
+ private static class LocalityGroupWriter {
private BlockFileWriter fileWriter;
private ABlockWriter blockWriter;
// private BlockAppender blockAppender;
private long blockSize = 100000;
- private int indexBlockSize;
private int entries = 0;
- private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
private LocalityGroupMetadata currentLocalityGroup = null;
- private int nextBlock = 0;
private Key lastKeyInBlock = null;
+ private Key prevKey = new Key();
+
+ private SampleLocalityGroupWriter sample;
+
+ LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, LocalityGroupMetadata currentLocalityGroup, SampleLocalityGroupWriter sample) {
+ this.fileWriter = fileWriter;
+ this.blockSize = blockSize;
+ this.currentLocalityGroup = currentLocalityGroup;
+ this.sample = sample;
+ }
+
+ public void append(Key key, Value value) throws IOException {
+
+ if (key.compareTo(prevKey) < 0) {
+ throw new IllegalStateException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey);
+ }
+
+ currentLocalityGroup.updateColumnCount(key);
+
+ if (currentLocalityGroup.getFirstKey() == null) {
+ currentLocalityGroup.setFirstKey(key);
+ }
+
+ if (sample != null) {
+ sample.append(key, value);
+ }
+
+ if (blockWriter == null) {
+ blockWriter = fileWriter.prepareDataBlock();
+ } else if (blockWriter.getRawSize() > blockSize) {
+ closeBlock(prevKey, false);
+ blockWriter = fileWriter.prepareDataBlock();
+ }
+
+ RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
+
+ rk.write(blockWriter);
+ value.write(blockWriter);
+ entries++;
+
+ prevKey = new Key(key);
+ lastKeyInBlock = prevKey;
+
+ }
+
+ private void closeBlock(Key key, boolean lastBlock) throws IOException {
+ blockWriter.close();
+
+ if (lastBlock)
+ currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
+ else
+ currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
+
+ if (sample != null)
+ sample.flushIfNeeded();
+
+ blockWriter = null;
+ lastKeyInBlock = null;
+ entries = 0;
+ }
+
+ public void close() throws IOException {
+ if (blockWriter != null) {
+ closeBlock(lastKeyInBlock, true);
+ }
+
+ if (sample != null) {
+ sample.close();
+ }
+ }
+ }
+
+ public static class Writer implements FileSKVWriter {
+
+ public static final int MAX_CF_IN_DLG = 1000;
+
+ private BlockFileWriter fileWriter;
+
+ // private BlockAppender blockAppender;
+ private long blockSize = 100000;
+ private int indexBlockSize;
+
+ private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
+ private ArrayList<LocalityGroupMetadata> sampleGroups = new ArrayList<LocalityGroupMetadata>();
+ private LocalityGroupMetadata currentLocalityGroup = null;
+ private LocalityGroupMetadata sampleLocalityGroup = null;
+
private boolean dataClosed = false;
private boolean closed = false;
- private Key prevKey = new Key();
private boolean startedDefaultLocalityGroup = false;
private HashSet<ByteSequence> previousColumnFamilies;
private long length = -1;
+ private LocalityGroupWriter lgWriter;
+
+ private SamplerConfigurationImpl samplerConfig;
+ private Sampler sampler;
+
public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
- this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+ this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX), null, null);
}
- public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) throws IOException {
+ public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize, SamplerConfigurationImpl samplerConfig, Sampler sampler) throws IOException {
this.blockSize = blockSize;
this.indexBlockSize = indexBlockSize;
this.fileWriter = bfw;
- this.blockWriter = null;
previousColumnFamilies = new HashSet<ByteSequence>();
+ this.samplerConfig = samplerConfig;
+ this.sampler = sampler;
}
@Override
@@ -329,10 +505,12 @@ public class RFile {
ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
mba.writeInt(RINDEX_MAGIC);
- mba.writeInt(RINDEX_VER_7);
+ mba.writeInt(RINDEX_VER_8);
- if (currentLocalityGroup != null)
+ if (currentLocalityGroup != null) {
localityGroups.add(currentLocalityGroup);
+ sampleGroups.add(sampleLocalityGroup);
+ }
mba.writeInt(localityGroups.size());
@@ -340,6 +518,18 @@ public class RFile {
lc.write(mba);
}
+ if (samplerConfig == null) {
+ mba.writeBoolean(false);
+ } else {
+ mba.writeBoolean(true);
+
+ for (LocalityGroupMetadata lc : sampleGroups) {
+ lc.write(mba);
+ }
+
+ samplerConfig.write(mba);
+ }
+
mba.close();
fileWriter.close();
length = fileWriter.getLength();
@@ -355,8 +545,8 @@ public class RFile {
dataClosed = true;
- if (blockWriter != null) {
- closeBlock(lastKeyInBlock, true);
+ if (lgWriter != null) {
+ lgWriter.close();
}
}
@@ -367,46 +557,7 @@ public class RFile {
throw new IllegalStateException("Cannont append, data closed");
}
- if (key.compareTo(prevKey) < 0) {
- throw new IllegalStateException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey);
- }
-
- currentLocalityGroup.updateColumnCount(key);
-
- if (currentLocalityGroup.getFirstKey() == null) {
- currentLocalityGroup.setFirstKey(key);
- }
-
- if (blockWriter == null) {
- blockWriter = fileWriter.prepareDataBlock();
- } else if (blockWriter.getRawSize() > blockSize) {
- closeBlock(prevKey, false);
- blockWriter = fileWriter.prepareDataBlock();
- }
-
- RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
-
- rk.write(blockWriter);
- value.write(blockWriter);
- entries++;
-
- prevKey = new Key(key);
- lastKeyInBlock = prevKey;
-
- }
-
- private void closeBlock(Key key, boolean lastBlock) throws IOException {
- blockWriter.close();
-
- if (lastBlock)
- currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
- else
- currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
-
- blockWriter = null;
- lastKeyInBlock = null;
- entries = 0;
- nextBlock++;
+ lgWriter.append(key, value);
}
@Override
@@ -425,28 +576,35 @@ public class RFile {
throw new IllegalStateException("Can not start anymore new locality groups after default locality group started");
}
- if (blockWriter != null) {
- closeBlock(lastKeyInBlock, true);
+ if (lgWriter != null) {
+ lgWriter.close();
}
if (currentLocalityGroup != null) {
localityGroups.add(currentLocalityGroup);
+ sampleGroups.add(sampleLocalityGroup);
}
if (columnFamilies == null) {
startedDefaultLocalityGroup = true;
- currentLocalityGroup = new LocalityGroupMetadata(nextBlock, previousColumnFamilies, indexBlockSize, fileWriter);
+ currentLocalityGroup = new LocalityGroupMetadata(previousColumnFamilies, indexBlockSize, fileWriter);
+ sampleLocalityGroup = new LocalityGroupMetadata(previousColumnFamilies, indexBlockSize, fileWriter);
} else {
if (!Collections.disjoint(columnFamilies, previousColumnFamilies)) {
HashSet<ByteSequence> overlap = new HashSet<ByteSequence>(columnFamilies);
overlap.retainAll(previousColumnFamilies);
throw new IllegalArgumentException("Column families over lap with previous locality group : " + overlap);
}
- currentLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, nextBlock, indexBlockSize, fileWriter);
+ currentLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, indexBlockSize, fileWriter);
+ sampleLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, indexBlockSize, fileWriter);
previousColumnFamilies.addAll(columnFamilies);
}
- prevKey = new Key();
+ SampleLocalityGroupWriter sampleWriter = null;
+ if (sampler != null) {
+ sampleWriter = new SampleLocalityGroupWriter(new LocalityGroupWriter(fileWriter, blockSize, sampleLocalityGroup, null), sampler);
+ }
+ lgWriter = new LocalityGroupWriter(fileWriter, blockSize, currentLocalityGroup, sampleWriter);
}
@Override
@@ -616,8 +774,9 @@ public class RFile {
if (columnFamilies.size() != 0 || inclusive)
throw new IllegalArgumentException("I do not know how to filter column families");
- if (interruptFlag != null && interruptFlag.get())
+ if (interruptFlag != null && interruptFlag.get()) {
throw new IterationInterruptedException();
+ }
try {
_seek(range);
@@ -830,6 +989,11 @@ public class RFile {
public void registerMetrics(MetricsGatherer<?> vmg) {
metricsGatherer = vmg;
}
+
+ @Override
+ public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+ throw new UnsupportedOperationException();
+ }
}
public static class Reader extends HeapIterator implements FileSKVIterator {
@@ -837,8 +1001,12 @@ public class RFile {
private BlockFileReader reader;
private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
+ private ArrayList<LocalityGroupMetadata> sampleGroups = new ArrayList<LocalityGroupMetadata>();
+
+ private LocalityGroupReader currentReaders[];
+ private LocalityGroupReader readers[];
+ private LocalityGroupReader sampleReaders[];
- private LocalityGroupReader lgReaders[];
private HashSet<ByteSequence> nonDefaultColumnFamilies;
private List<Reader> deepCopies;
@@ -846,6 +1014,10 @@ public class RFile {
private AtomicBoolean interruptFlag;
+ private SamplerConfigurationImpl samplerConfig = null;
+
+ private int rfileVersion;
+
public Reader(BlockFileReader rdr) throws IOException {
this.reader = rdr;
@@ -853,14 +1025,15 @@ public class RFile {
try {
int magic = mb.readInt();
int ver = mb.readInt();
+ rfileVersion = ver;
if (magic != RINDEX_MAGIC)
throw new IOException("Did not see expected magic number, saw " + magic);
- if (ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
+ if (ver != RINDEX_VER_8 && ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
throw new IOException("Did not see expected version, saw " + ver);
int size = mb.readInt();
- lgReaders = new LocalityGroupReader[size];
+ currentReaders = new LocalityGroupReader[size];
deepCopies = new LinkedList<Reader>();
@@ -869,8 +1042,28 @@ public class RFile {
lgm.readFields(mb);
localityGroups.add(lgm);
- lgReaders[i] = new LocalityGroupReader(reader, lgm, ver);
+ currentReaders[i] = new LocalityGroupReader(reader, lgm, ver);
+ }
+
+ readers = currentReaders;
+
+ if (ver == RINDEX_VER_8 && mb.readBoolean()) {
+ sampleReaders = new LocalityGroupReader[size];
+
+ for (int i = 0; i < size; i++) {
+ LocalityGroupMetadata lgm = new LocalityGroupMetadata(ver, rdr);
+ lgm.readFields(mb);
+ sampleGroups.add(lgm);
+
+ sampleReaders[i] = new LocalityGroupReader(reader, lgm, ver);
+ }
+
+ samplerConfig = new SamplerConfigurationImpl(mb);
+ } else {
+ sampleReaders = null;
+ samplerConfig = null;
}
+
} finally {
mb.close();
}
@@ -881,24 +1074,53 @@ public class RFile {
nonDefaultColumnFamilies.addAll(lgm.columnFamilies.keySet());
}
- createHeap(lgReaders.length);
+ createHeap(currentReaders.length);
+ }
+
+ private Reader(Reader r, LocalityGroupReader sampleReaders[]) {
+ super(sampleReaders.length);
+ this.reader = r.reader;
+ this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies;
+ this.currentReaders = new LocalityGroupReader[sampleReaders.length];
+ this.deepCopies = r.deepCopies;
+ this.deepCopy = false;
+ this.readers = r.readers;
+ this.sampleReaders = r.sampleReaders;
+ this.samplerConfig = r.samplerConfig;
+ this.rfileVersion = r.rfileVersion;
+ for (int i = 0; i < sampleReaders.length; i++) {
+ this.currentReaders[i] = sampleReaders[i];
+ this.currentReaders[i].setInterruptFlag(r.interruptFlag);
+ }
}
- private Reader(Reader r) {
- super(r.lgReaders.length);
+ private Reader(Reader r, boolean useSample) {
+ super(r.currentReaders.length);
this.reader = r.reader;
this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies;
- this.lgReaders = new LocalityGroupReader[r.lgReaders.length];
+ this.currentReaders = new LocalityGroupReader[r.currentReaders.length];
this.deepCopies = r.deepCopies;
this.deepCopy = true;
- for (int i = 0; i < lgReaders.length; i++) {
- this.lgReaders[i] = new LocalityGroupReader(r.lgReaders[i]);
- this.lgReaders[i].setInterruptFlag(r.interruptFlag);
+ this.samplerConfig = r.samplerConfig;
+ this.rfileVersion = r.rfileVersion;
+ this.readers = r.readers;
+ this.sampleReaders = r.sampleReaders;
+
+ for (int i = 0; i < r.readers.length; i++) {
+ if (useSample) {
+ this.currentReaders[i] = new LocalityGroupReader(r.sampleReaders[i]);
+ this.currentReaders[i].setInterruptFlag(r.interruptFlag);
+ } else {
+ this.currentReaders[i] = new LocalityGroupReader(r.readers[i]);
+ this.currentReaders[i].setInterruptFlag(r.interruptFlag);
+ }
+
}
+
}
private void closeLocalityGroupReaders() {
- for (LocalityGroupReader lgr : lgReaders) {
+ for (LocalityGroupReader lgr : currentReaders) {
try {
lgr.close();
} catch (IOException e) {
@@ -926,6 +1148,16 @@ public class RFile {
closeDeepCopies();
closeLocalityGroupReaders();
+ if (sampleReaders != null) {
+ for (LocalityGroupReader lgr : sampleReaders) {
+ try {
+ lgr.close();
+ } catch (IOException e) {
+ log.warn("Errored out attempting to close LocalityGroupReader.", e);
+ }
+ }
+ }
+
try {
reader.close();
} finally {
@@ -937,17 +1169,17 @@ public class RFile {
@Override
public Key getFirstKey() throws IOException {
- if (lgReaders.length == 0) {
+ if (currentReaders.length == 0) {
return null;
}
Key minKey = null;
- for (int i = 0; i < lgReaders.length; i++) {
+ for (int i = 0; i < currentReaders.length; i++) {
if (minKey == null) {
- minKey = lgReaders[i].getFirstKey();
+ minKey = currentReaders[i].getFirstKey();
} else {
- Key firstKey = lgReaders[i].getFirstKey();
+ Key firstKey = currentReaders[i].getFirstKey();
if (firstKey != null && firstKey.compareTo(minKey) < 0)
minKey = firstKey;
}
@@ -958,17 +1190,17 @@ public class RFile {
@Override
public Key getLastKey() throws IOException {
- if (lgReaders.length == 0) {
+ if (currentReaders.length == 0) {
return null;
}
Key maxKey = null;
- for (int i = 0; i < lgReaders.length; i++) {
+ for (int i = 0; i < currentReaders.length; i++) {
if (maxKey == null) {
- maxKey = lgReaders[i].getLastKey();
+ maxKey = currentReaders[i].getLastKey();
} else {
- Key lastKey = lgReaders[i].getLastKey();
+ Key lastKey = currentReaders[i].getLastKey();
if (lastKey != null && lastKey.compareTo(maxKey) > 0)
maxKey = lastKey;
}
@@ -988,10 +1220,26 @@ public class RFile {
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- Reader copy = new Reader(this);
- copy.setInterruptFlagInternal(interruptFlag);
- deepCopies.add(copy);
- return copy;
+ if (env != null && env.isSamplingEnabled()) {
+ SamplerConfiguration sc = env.getSamplerConfiguration();
+ if (sc == null) {
+ throw new SampleNotPresentException();
+ }
+
+ if (this.samplerConfig != null && this.samplerConfig.equals(new SamplerConfigurationImpl(sc))) {
+ Reader copy = new Reader(this, true);
+ copy.setInterruptFlagInternal(interruptFlag);
+ deepCopies.add(copy);
+ return copy;
+ } else {
+ throw new SampleNotPresentException();
+ }
+ } else {
+ Reader copy = new Reader(this, false);
+ copy.setInterruptFlagInternal(interruptFlag);
+ deepCopies.add(copy);
+ return copy;
+ }
}
@Override
@@ -1027,14 +1275,20 @@ public class RFile {
*/
public void registerMetrics(MetricsGatherer<?> vmg) {
vmg.init(getLocalityGroupCF());
- for (LocalityGroupReader lgr : lgReaders) {
+ for (LocalityGroupReader lgr : currentReaders) {
lgr.registerMetrics(vmg);
}
+
+ if (sampleReaders != null) {
+ for (LocalityGroupReader lgr : sampleReaders) {
+ lgr.registerMetrics(vmg);
+ }
+ }
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- numLGSeeked = LocalityGroupIterator.seek(this, lgReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
+ numLGSeeked = LocalityGroupIterator.seek(this, currentReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
}
int getNumLocalityGroupsSeeked() {
@@ -1045,16 +1299,53 @@ public class RFile {
ArrayList<Iterator<IndexEntry>> indexes = new ArrayList<Iterator<IndexEntry>>();
- for (LocalityGroupReader lgr : lgReaders) {
+ for (LocalityGroupReader lgr : currentReaders) {
indexes.add(lgr.getIndex());
}
return new MultiIndexIterator(this, indexes);
}
+ @Override
+ public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+ Preconditions.checkNotNull(sampleConfig);
+
+ if (this.samplerConfig != null && this.samplerConfig.equals(sampleConfig)) {
+ Reader copy = new Reader(this, sampleReaders);
+ copy.setInterruptFlagInternal(interruptFlag);
+ return copy;
+ }
+
+ return null;
+ }
+
+ // only visible for printinfo
+ FileSKVIterator getSample() {
+ if (samplerConfig == null)
+ return null;
+ return getSample(this.samplerConfig);
+ }
+
public void printInfo() throws IOException {
+
+ System.out.printf("%-24s : %d\n", "RFile Version", rfileVersion);
+ System.out.println();
+
for (LocalityGroupMetadata lgm : localityGroups) {
- lgm.printInfo();
+ lgm.printInfo(false);
+ }
+
+ if (sampleGroups.size() > 0) {
+
+ System.out.println();
+ System.out.printf("%-24s :\n", "Sample Configuration");
+ System.out.printf("\t%-22s : %s\n", "Sampler class ", samplerConfig.getClassName());
+ System.out.printf("\t%-22s : %s\n", "Sampler options ", samplerConfig.getOptions());
+ System.out.println();
+
+ for (LocalityGroupMetadata lgm : sampleGroups) {
+ lgm.printInfo(true);
+ }
}
}
@@ -1071,7 +1362,7 @@ public class RFile {
private void setInterruptFlagInternal(AtomicBoolean flag) {
this.interruptFlag = flag;
- for (LocalityGroupReader lgr : lgReaders) {
+ for (LocalityGroupReader lgr : currentReaders) {
lgr.setInterruptFlag(interruptFlag);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 088abfe..17e8e96 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -33,6 +33,9 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
import org.apache.accumulo.core.file.rfile.RFile.Writer;
+import org.apache.accumulo.core.sample.Sampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.sample.impl.SamplerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -123,8 +126,15 @@ public class RFileOperations extends FileOperations {
long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
+ SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl.newSamplerConfig(acuconf);
+ Sampler sampler = null;
+
+ if (samplerConfig != null) {
+ sampler = SamplerFactory.newSampler(samplerConfig, acuconf);
+ }
+
CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf, acuconf);
- Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);
+ Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
return writer;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
index 5a53e93..5dbafa6 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
@@ -18,6 +18,8 @@ package org.apache.accumulo.core.iterators;
import java.io.IOException;
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -37,4 +39,52 @@ public interface IteratorEnvironment {
void registerSideChannel(SortedKeyValueIterator<Key,Value> iter);
Authorizations getAuthorizations();
+
+ /**
+ * Returns a new iterator environment object that can be used to create deep copies over sample data. The new object created will use the current sampling
+ * configuration for the table. The existing iterator environment object will not be modified.
+ *
+ * <p>
+ * Since sample data could be created in many different ways, a good practice for an iterator is to verify the sampling configuration is as expected.
+ *
+ * <p>
+ *
+ * <pre>
+ * <code>
+ * class MyIter implements SortedKeyValueIterator<Key,Value> {
+ * SortedKeyValueIterator<Key,Value> source;
+ * SortedKeyValueIterator<Key,Value> sampleIter;
+ * @Override
+ * void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) {
+ * IteratorEnvironment sampleEnv = env.cloneWithSamplingEnabled();
+ * //do some sanity checks on sampling config
+ * validateSamplingConfiguration(sampleEnv.getSamplerConfiguration());
+ * sampleIter = source.deepCopy(sampleEnv);
+ * this.source = source;
+ * }
+ * }
+ * </code>
+ * </pre>
+ *
+ * @throws SampleNotPresentException
+ * when sampling is not configured for table.
+ * @since 1.8.0
+ */
+ IteratorEnvironment cloneWithSamplingEnabled();
+
+ /**
+ * There are at least two conditions under which sampling will be enabled for an environment. One condition is when sampling is enabled for the scan that
+ * starts everything. Another possibility is for a deep copy created with an environment created by calling {@link #cloneWithSamplingEnabled()}
+ *
+ * @return true if sampling is enabled for this environment.
+ * @since 1.8.0
+ */
+ boolean isSamplingEnabled();
+
+ /**
+ *
+ * @return sampling configuration is sampling is enabled for environment, otherwise returns null.
+ * @since 1.8.0
+ */
+ SamplerConfiguration getSamplerConfiguration();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
index 3999b6f..25c010d 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
@@ -53,6 +54,9 @@ public class SortedMapIterator implements InterruptibleIterator {
@Override
public SortedMapIterator deepCopy(IteratorEnvironment env) {
+ if (env != null && env.isSamplingEnabled()) {
+ throw new SampleNotPresentException();
+ }
return new SortedMapIterator(map, interruptFlag);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
index 7723ef1..5b37b30 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
@@ -56,8 +56,6 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
@Override
public Key getTopKey() {
- if (source == null)
- throw new IllegalStateException("no source set");
if (seenSeek == false)
throw new IllegalStateException("never been seeked");
return getSource().getTopKey();
@@ -65,8 +63,6 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
@Override
public Value getTopValue() {
- if (source == null)
- throw new IllegalStateException("no source set");
if (seenSeek == false)
throw new IllegalStateException("never been seeked");
return getSource().getTopValue();
@@ -74,8 +70,6 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
@Override
public boolean hasTop() {
- if (source == null)
- throw new IllegalStateException("no source set");
if (seenSeek == false)
throw new IllegalStateException("never been seeked");
return getSource().hasTop();
@@ -89,8 +83,6 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
@Override
public void next() throws IOException {
- if (source == null)
- throw new IllegalStateException("no source set");
if (seenSeek == false)
throw new IllegalStateException("never been seeked");
getSource().next();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/system/EmptyIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/EmptyIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/EmptyIterator.java
new file mode 100644
index 0000000..b791eb1
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/EmptyIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.iterators.system;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+public class EmptyIterator implements InterruptibleIterator {
+
+ public static final EmptyIterator EMPTY_ITERATOR = new EmptyIterator();
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {}
+
+ @Override
+ public boolean hasTop() {
+ return false;
+ }
+
+ @Override
+ public void next() throws IOException {
+ // nothing should call this since hasTop always returns false
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {}
+
+ @Override
+ public Key getTopKey() {
+ // nothing should call this since hasTop always returns false
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Value getTopValue() {
+ // nothing should call this since hasTop always returns false
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return EMPTY_ITERATOR;
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {}
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
index 9d59570..f9f0600 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.file.map.MapFileUtil;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -154,4 +155,9 @@ public class MapFileIterator implements FileSKVIterator {
public void close() throws IOException {
reader.close();
}
+
+ @Override
+ public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/system/SampleIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SampleIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SampleIterator.java
new file mode 100644
index 0000000..aedcdba
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SampleIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.iterators.system;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.apache.accumulo.core.sample.Sampler;
+
+public class SampleIterator extends Filter {
+
+ private Sampler sampler = new RowSampler();
+
+ public SampleIterator(SortedKeyValueIterator<Key,Value> iter, Sampler sampler) {
+ setSource(iter);
+ this.sampler = sampler;
+ }
+
+ @Override
+ public boolean accept(Key k, Value v) {
+ return sampler.accept(k);
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new SampleIterator(getSource().deepCopy(env), sampler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/iterators/system/SequenceFileIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SequenceFileIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SequenceFileIterator.java
index 8710acd..8ea3800 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SequenceFileIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SequenceFileIterator.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
@@ -126,4 +127,9 @@ public class SequenceFileIterator implements FileSKVIterator {
public void setInterruptFlag(AtomicBoolean flag) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/AbstractHashSampler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/AbstractHashSampler.java b/core/src/main/java/org/apache/accumulo/core/sample/AbstractHashSampler.java
new file mode 100644
index 0000000..ae2b951
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/AbstractHashSampler.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample;
+
+import java.util.Set;
+
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+/**
+ * A base class that can be used to create Samplers based on hashing. This class offers consistent options for configuring the hash function. The subclass
+ * decides which parts of the key to hash.
+ *
+ * <p>
+ * This class support two options passed into {@link #init(SamplerConfiguration)}. One option is {@code hasher} which specifies a hashing algorithm. Valid
+ * values for this option are {@code md5}, {@code sha1}, and {@code murmur3_32}. If you are not sure, then choose {@code murmur3_32}.
+ *
+ * <p>
+ * The second option is {@code modulus} which can have any positive integer as a value.
+ *
+ * <p>
+ * Any data where {@code hash(data) % modulus == 0} will be selected for the sample.
+ *
+ * @since 1.8.0
+ */
+
+public abstract class AbstractHashSampler implements Sampler {
+
+ private HashFunction hashFunction;
+ private int modulus;
+
+ private static final Set<String> VALID_OPTIONS = ImmutableSet.of("hasher", "modulus");
+
+ /**
+ * Subclasses with options should override this method and return true if the option is valid for the subclass or if {@code super.isValidOption(opt)} returns
+ * true.
+ */
+
+ protected boolean isValidOption(String option) {
+ return VALID_OPTIONS.contains(option);
+ }
+
+ /**
+ * Subclasses with options should override this method and call {@code super.init(config)}.
+ */
+
+ @Override
+ public void init(SamplerConfiguration config) {
+ String hasherOpt = config.getOptions().get("hasher");
+ String modulusOpt = config.getOptions().get("modulus");
+
+ Preconditions.checkNotNull(hasherOpt, "Hasher not specified");
+ Preconditions.checkNotNull(modulusOpt, "Modulus not specified");
+
+ for (String option : config.getOptions().keySet()) {
+ Preconditions.checkArgument(isValidOption(option), "Unknown option : %s", option);
+ }
+
+ switch (hasherOpt) {
+ case "murmur3_32":
+ hashFunction = Hashing.murmur3_32();
+ break;
+ case "md5":
+ hashFunction = Hashing.md5();
+ break;
+ case "sha1":
+ hashFunction = Hashing.sha1();
+ break;
+ default:
+ throw new IllegalArgumentException("Uknown hahser " + hasherOpt);
+ }
+
+ modulus = Integer.parseInt(modulusOpt);
+ }
+
+ /**
+ * Subclass must override this method and hash some portion of the key.
+ */
+ protected abstract HashCode hash(HashFunction hashFunction, Key k);
+
+ @Override
+ public boolean accept(Key k) {
+ return hash(hashFunction, k).asInt() % modulus == 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java b/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java
new file mode 100644
index 0000000..ad68cf6
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample;
+
+import java.util.Set;
+
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+
+/**
+ * This sampler can hash any subset of a Key's fields. The fields that hashed for the sample are determined by the configuration options passed in
+ * {@link #init(SamplerConfiguration)}. The following key values are valid options.
+ *
+ * <UL>
+ * <li>row=true|false
+ * <li>family=true|false
+ * <li>qualifier=true|false
+ * <li>visibility=true|false
+ * </UL>
+ *
+ * <p>
+ * If not specified in the options, fields default to false.
+ *
+ * <p>
+ * To determine what options are valid for hashing see {@link AbstractHashSampler}
+ *
+ * <p>
+ * To configure Accumulo to generate sample data on one thousandth of the column qualifiers, the following SamplerConfiguration could be created and used to
+ * configure a table.
+ *
+ * <p>
+ * {@code new SamplerConfiguration(RowColumnSampler.class.getName()).setOptions(ImmutableMap.of("hasher","murmur3_32","modulus","1009","qualifier","true"))}
+ *
+ * <p>
+ * With this configuration, if a column qualifier is selected then all key values contains that column qualifier will end up in the sample data.
+ *
+ * @since 1.8.0
+ */
+
+public class RowColumnSampler extends AbstractHashSampler {
+
+ private boolean row = true;
+ private boolean family = true;
+ private boolean qualifier = true;
+ private boolean visibility = true;
+
+ private static final Set<String> VALID_OPTIONS = ImmutableSet.of("row", "family", "qualifier", "visibility");
+
+ private boolean hashField(SamplerConfiguration config, String field) {
+ String optValue = config.getOptions().get(field);
+ if (optValue != null) {
+ return Boolean.parseBoolean(optValue);
+ }
+
+ return false;
+ }
+
+ @Override
+ protected boolean isValidOption(String option) {
+ return super.isValidOption(option) || VALID_OPTIONS.contains(option);
+ }
+
+ @Override
+ public void init(SamplerConfiguration config) {
+ super.init(config);
+
+ row = hashField(config, "row");
+ family = hashField(config, "family");
+ qualifier = hashField(config, "qualifier");
+ visibility = hashField(config, "visibility");
+
+ if (!row && !family && !qualifier && !visibility) {
+ throw new IllegalStateException("Must hash at least one key field");
+ }
+ }
+
+ private void putByteSquence(ByteSequence data, Hasher hasher) {
+ hasher.putBytes(data.getBackingArray(), data.offset(), data.length());
+ }
+
+ @Override
+ protected HashCode hash(HashFunction hashFunction, Key k) {
+ Hasher hasher = hashFunction.newHasher();
+
+ if (row) {
+ putByteSquence(k.getRowData(), hasher);
+ }
+
+ if (family) {
+ putByteSquence(k.getColumnFamilyData(), hasher);
+ }
+
+ if (qualifier) {
+ putByteSquence(k.getColumnQualifierData(), hasher);
+ }
+
+ if (visibility) {
+ putByteSquence(k.getColumnVisibilityData(), hasher);
+ }
+
+ return hasher.hash();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/RowSampler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/RowSampler.java b/core/src/main/java/org/apache/accumulo/core/sample/RowSampler.java
new file mode 100644
index 0000000..8690a1c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/RowSampler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+
+/**
+ * Builds a sample based on entire rows. If a row is selected for the sample, then all of its columns will be included.
+ *
+ * <p>
+ * To determine what options are valid for hashing see {@link AbstractHashSampler}. This class offers no addition options, it always hashes on the row.
+ *
+ * <p>
+ * To configure Accumulo to generate sample data on one thousandth of the rows, the following SamplerConfiguration could be created and used to configure a
+ * table.
+ *
+ * <p>
+ * {@code new SamplerConfiguration(RowSampler.class.getName()).setOptions(ImmutableMap.of("hasher","murmur3_32","modulus","1009"))}
+ *
+ * @since 1.8.0
+ */
+
+public class RowSampler extends AbstractHashSampler {
+
+ @Override
+ protected HashCode hash(HashFunction hashFunction, Key k) {
+ ByteSequence row = k.getRowData();
+ return hashFunction.hashBytes(row.getBackingArray(), row.offset(), row.length());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/Sampler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/Sampler.java b/core/src/main/java/org/apache/accumulo/core/sample/Sampler.java
new file mode 100644
index 0000000..64adeec
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/Sampler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample;
+
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+
+/**
+ * A function that decides which key values are stored in a tables sample. As Accumuo compacts data and creates rfiles it uses a Sampler to decided what to
+ * store in the rfiles sample section. The class name of the Sampler and the Samplers configuration are stored in each rfile. A scan of a tables sample will
+ * only succeed if all rfiles were created with the same sampler and sampler configuration.
+ *
+ * <p>
+ * Since the decisions that Sampler makes are persisted, the behavior of a Sampler for a given configuration should always be the same. One way to offer a new
+ * behavior is to offer new options, while still supporting old behavior with a Samplers existing options.
+ *
+ * <p>
+ * Ideally a sampler that selects a Key k1 would also select updates for k1. For example if a Sampler selects :
+ * {@code row='000989' family='name' qualifier='last' visibility='ADMIN' time=9 value='Doe'}, it would be nice if it also selected :
+ * {@code row='000989' family='name' qualifier='last' visibility='ADMIN' time=20 value='Dough'}. Using hash and modulo on the key fields is a good way to
+ * accomplish this and {@link AbstractHashSampler} provides a good basis for implementation.
+ *
+ * @since 1.8.0
+ */
+
+public interface Sampler {
+
+ /**
+ * An implementation of Sampler must have a noarg constructor. After construction this method is called once to initialize a sampler before it is used.
+ *
+ * @param config
+ * Configuration options for a sampler.
+ */
+ void init(SamplerConfiguration config);
+
+ /**
+ * @param k
+ * A key that was written to a rfile.
+ * @return True if the key (and its associtated value) should be stored in the rfile's sample. Return false if it should not be included.
+ */
+ boolean accept(Key k);
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
new file mode 100644
index 0000000..348def4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.io.Writable;
+
+public class SamplerConfigurationImpl implements Writable {
+ private String className;
+ private Map<String,String> options;
+
+ public SamplerConfigurationImpl(DataInput in) throws IOException {
+ readFields(in);
+ }
+
+ public SamplerConfigurationImpl(SamplerConfiguration sc) {
+ this.className = sc.getSamplerClassName();
+ this.options = new HashMap<>(sc.getOptions());
+ }
+
+ public SamplerConfigurationImpl(String className, Map<String,String> options) {
+ this.className = className;
+ this.options = options;
+ }
+
+ public SamplerConfigurationImpl() {}
+
+ public String getClassName() {
+ return className;
+ }
+
+ public Map<String,String> getOptions() {
+ return Collections.unmodifiableMap(options);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * className.hashCode() + options.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof SamplerConfigurationImpl) {
+ SamplerConfigurationImpl osc = (SamplerConfigurationImpl) o;
+
+ return className.equals(osc.className) && options.equals(osc.options);
+ }
+
+ return false;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // The Writable serialization methods for this class are called by RFile and therefore must be very stable. An alternative way to serialize this class is to
+ // use Thrift. That was not used here inorder to avoid making RFile depend on Thrift.
+
+ // versioning info
+ out.write(1);
+
+ out.writeUTF(className);
+
+ out.writeInt(options.size());
+
+ for (Entry<String,String> entry : options.entrySet()) {
+ out.writeUTF(entry.getKey());
+ out.writeUTF(entry.getValue());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int version = in.readByte();
+
+ if (version != 1) {
+ throw new IllegalArgumentException("Unexpected version " + version);
+ }
+
+ className = in.readUTF();
+
+ options = new HashMap<String,String>();
+
+ int num = in.readInt();
+
+ for (int i = 0; i < num; i++) {
+ String key = in.readUTF();
+ String val = in.readUTF();
+ options.put(key, val);
+ }
+ }
+
+ public SamplerConfiguration toSamplerConfiguration() {
+ SamplerConfiguration sc = new SamplerConfiguration(className);
+ sc.setOptions(options);
+ return sc;
+ }
+
+ public List<Pair<String,String>> toTableProperties() {
+ ArrayList<Pair<String,String>> props = new ArrayList<>();
+
+ for (Entry<String,String> entry : options.entrySet()) {
+ props.add(new Pair<String,String>(Property.TABLE_SAMPLER_OPTS.getKey() + entry.getKey(), entry.getValue()));
+ }
+
+ // intentionally added last, so its set last
+ props.add(new Pair<String,String>(Property.TABLE_SAMPLER.getKey(), className));
+
+ return props;
+ }
+
+ public Map<String,String> toTablePropertiesMap() {
+ LinkedHashMap<String,String> propsMap = new LinkedHashMap<>();
+ for (Pair<String,String> pair : toTableProperties()) {
+ propsMap.put(pair.getFirst(), pair.getSecond());
+ }
+
+ return propsMap;
+ }
+
+ public static SamplerConfigurationImpl newSamplerConfig(AccumuloConfiguration acuconf) {
+ String className = acuconf.get(Property.TABLE_SAMPLER);
+
+ if (className == null || className.equals("")) {
+ return null;
+ }
+
+ Map<String,String> rawOptions = acuconf.getAllPropertiesWithPrefix(Property.TABLE_SAMPLER_OPTS);
+ Map<String,String> options = new HashMap<>();
+
+ for (Entry<String,String> entry : rawOptions.entrySet()) {
+ String key = entry.getKey().substring(Property.TABLE_SAMPLER_OPTS.getKey().length());
+ options.put(key, entry.getValue());
+ }
+
+ return new SamplerConfigurationImpl(className, options);
+ }
+
+ @Override
+ public String toString() {
+ return className + " " + options;
+ }
+
+ public static TSamplerConfiguration toThrift(SamplerConfiguration samplerConfig) {
+ if (samplerConfig == null)
+ return null;
+ return new TSamplerConfiguration(samplerConfig.getSamplerClassName(), samplerConfig.getOptions());
+ }
+
+ public static SamplerConfiguration fromThrift(TSamplerConfiguration tsc) {
+ if (tsc == null)
+ return null;
+ return new SamplerConfiguration(tsc.getClassName()).setOptions(tsc.getOptions());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java
new file mode 100644
index 0000000..3f11fbe
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.sample.impl;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.sample.Sampler;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+
+public class SamplerFactory {
+ public static Sampler newSampler(SamplerConfigurationImpl config, AccumuloConfiguration acuconf) throws IOException {
+ String context = acuconf.get(Property.TABLE_CLASSPATH);
+
+ Class<? extends Sampler> clazz;
+ try {
+ if (context != null && !context.equals(""))
+ clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, config.getClassName(), Sampler.class);
+ else
+ clazz = AccumuloVFSClassLoader.loadClass(config.getClassName(), Sampler.class);
+
+ Sampler sampler = clazz.newInstance();
+
+ sampler.init(config.toSamplerConfiguration());
+
+ return sampler;
+
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}