You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2015/09/21 15:51:28 UTC
[3/7] accumulo git commit: ACCUMULO-3913 Added per table sampling
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
index a4936cf..07757a6 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
@@ -186,11 +186,11 @@ public class LocalityGroupUtil {
return ecf;
}
- private static class PartitionedMutation extends Mutation {
+ public static class PartitionedMutation extends Mutation {
private byte[] row;
private List<ColumnUpdate> updates;
- PartitionedMutation(byte[] row, List<ColumnUpdate> updates) {
+ public PartitionedMutation(byte[] row, List<ColumnUpdate> updates) {
this.row = row;
this.updates = updates;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 051daee..27b72f2 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -31,6 +31,10 @@ exception TooManyFilesException {
1:data.TKeyExtent extent
}
+exception TSampleNotPresentException {
+ 1:data.TKeyExtent extent
+}
+
exception NoSuchScanIDException {
}
@@ -136,6 +140,11 @@ struct IteratorConfig {
1:list<TIteratorSetting> iterators;
}
+struct TSamplerConfiguration {
+ 1:string className
+ 2:map<string, string> options
+}
+
service TabletClientService extends client.ClientService {
// scan a range of keys
data.InitialScan startScan(11:trace.TInfo tinfo,
@@ -150,9 +159,10 @@ service TabletClientService extends client.ClientService {
9:bool waitForWrites,
10:bool isolated,
12:i64 readaheadThreshold,
- 13:i64 batchTimeOut) throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
+ 13:TSamplerConfiguration samplerConfig,
+ 14:i64 batchTimeOut) throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe, 4:TSampleNotPresentException tsnpe),
- data.ScanResult continueScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
+ data.ScanResult continueScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe, 4:TSampleNotPresentException tsnpe),
oneway void closeScan(2:trace.TInfo tinfo, 1:data.ScanID scanID),
// scan over a series of ranges
@@ -164,8 +174,9 @@ service TabletClientService extends client.ClientService {
5:map<string, map<string, string>> ssio,
6:list<binary> authorizations,
7:bool waitForWrites,
- 9:i64 batchTimeOut) throws (1:client.ThriftSecurityException sec),
- data.MultiScanResult continueMultiScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi),
+ 9:TSamplerConfiguration samplerConfig,
+ 10:i64 batchTimeOut) throws (1:client.ThriftSecurityException sec, 2:TSampleNotPresentException tsnpe),
+ data.MultiScanResult continueMultiScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi, 2:TSampleNotPresentException tsnpe),
void closeMultiScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi),
//the following calls support a batch update to multiple tablets on a tablet server
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
index 7a56d1d..7bf9eb1 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.DiskUsage;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
@@ -226,6 +227,22 @@ public class TableOperationsHelperTest {
TableNotFoundException {
return false;
}
+
+ @Override
+ public void setSamplerConfiguration(String tableName, SamplerConfiguration samplerConfiguration) throws TableNotFoundException, AccumuloException,
+ AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clearSamplerConfiguration(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SamplerConfiguration getSamplerConfiguration(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ throw new UnsupportedOperationException();
+ }
}
protected TableOperationsHelper getHelper() {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
index bcf8a24..d88453e 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
@@ -20,9 +20,12 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Test;
@@ -36,6 +39,9 @@ public class AccumuloFileOutputFormatTest {
long c = 50l;
long d = 10l;
String e = "snappy";
+ SamplerConfiguration samplerConfig = new SamplerConfiguration(RowSampler.class.getName());
+ samplerConfig.addOption("hasher", "murmur3_32");
+ samplerConfig.addOption("modulus", "109");
JobConf job = new JobConf();
AccumuloFileOutputFormat.setReplication(job, a);
@@ -43,6 +49,7 @@ public class AccumuloFileOutputFormatTest {
AccumuloFileOutputFormat.setDataBlockSize(job, c);
AccumuloFileOutputFormat.setIndexBlockSize(job, d);
AccumuloFileOutputFormat.setCompressionType(job, e);
+ AccumuloFileOutputFormat.setSampler(job, samplerConfig);
AccumuloConfiguration acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job);
@@ -51,12 +58,16 @@ public class AccumuloFileOutputFormatTest {
assertEquals(50l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
assertEquals(10l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+ assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
a = 17;
b = 1300l;
c = 150l;
d = 110l;
e = "lzo";
+ samplerConfig = new SamplerConfiguration(RowSampler.class.getName());
+ samplerConfig.addOption("hasher", "md5");
+ samplerConfig.addOption("modulus", "100003");
job = new JobConf();
AccumuloFileOutputFormat.setReplication(job, a);
@@ -64,6 +75,7 @@ public class AccumuloFileOutputFormatTest {
AccumuloFileOutputFormat.setDataBlockSize(job, c);
AccumuloFileOutputFormat.setIndexBlockSize(job, d);
AccumuloFileOutputFormat.setCompressionType(job, e);
+ AccumuloFileOutputFormat.setSampler(job, samplerConfig);
acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job);
@@ -72,6 +84,6 @@ public class AccumuloFileOutputFormatTest {
assertEquals(150l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
assertEquals(110l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
-
+ assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
index 3923566..cf0c8d6 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
@@ -20,9 +20,12 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Test;
@@ -36,6 +39,9 @@ public class AccumuloFileOutputFormatTest {
long c = 50l;
long d = 10l;
String e = "snappy";
+ SamplerConfiguration samplerConfig = new SamplerConfiguration(RowSampler.class.getName());
+ samplerConfig.addOption("hasher", "murmur3_32");
+ samplerConfig.addOption("modulus", "109");
Job job1 = Job.getInstance();
AccumuloFileOutputFormat.setReplication(job1, a);
@@ -43,6 +49,7 @@ public class AccumuloFileOutputFormatTest {
AccumuloFileOutputFormat.setDataBlockSize(job1, c);
AccumuloFileOutputFormat.setIndexBlockSize(job1, d);
AccumuloFileOutputFormat.setCompressionType(job1, e);
+ AccumuloFileOutputFormat.setSampler(job1, samplerConfig);
AccumuloConfiguration acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job1.getConfiguration());
@@ -51,12 +58,16 @@ public class AccumuloFileOutputFormatTest {
assertEquals(50l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
assertEquals(10l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+ assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
a = 17;
b = 1300l;
c = 150l;
d = 110l;
e = "lzo";
+ samplerConfig = new SamplerConfiguration(RowSampler.class.getName());
+ samplerConfig.addOption("hasher", "md5");
+ samplerConfig.addOption("modulus", "100003");
Job job2 = Job.getInstance();
AccumuloFileOutputFormat.setReplication(job2, a);
@@ -64,6 +75,7 @@ public class AccumuloFileOutputFormatTest {
AccumuloFileOutputFormat.setDataBlockSize(job2, c);
AccumuloFileOutputFormat.setIndexBlockSize(job2, d);
AccumuloFileOutputFormat.setCompressionType(job2, e);
+ AccumuloFileOutputFormat.setSampler(job2, samplerConfig);
acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job2.getConfiguration());
@@ -72,6 +84,7 @@ public class AccumuloFileOutputFormatTest {
assertEquals(150l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
assertEquals(110l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+ assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 6f89454..66978dd 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.Random;
import junit.framework.TestCase;
-
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.file.blockfile.ABlockWriter;
@@ -77,7 +76,7 @@ public class MultiLevelIndexTest extends TestCase {
FSDataInputStream in = new FSDataInputStream(bais);
CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance(), aconf);
- Reader reader = new Reader(_cbr, RFile.RINDEX_VER_7);
+ Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8);
BlockRead rootIn = _cbr.getMetaBlock("root");
reader.readFields(rootIn);
rootIn.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 2e2b346..ab98f49 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -28,16 +28,21 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
@@ -57,6 +62,10 @@ import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.apache.accumulo.core.sample.Sampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.sample.impl.SamplerFactory;
import org.apache.accumulo.core.security.crypto.CryptoTest;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
@@ -68,14 +77,37 @@ import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import com.google.common.primitives.Bytes;
public class RFileTest {
+ public static class SampleIE extends BaseIteratorEnvironment {
+
+ private SamplerConfiguration samplerConfig;
+
+ SampleIE(SamplerConfiguration config) {
+ this.samplerConfig = config;
+ }
+
+ @Override
+ public boolean isSamplingEnabled() {
+ return samplerConfig != null;
+ }
+
+ @Override
+ public SamplerConfiguration getSamplerConfiguration() {
+ return samplerConfig;
+ }
+ }
+
private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
@Rule
@@ -193,7 +225,15 @@ public class RFileTest {
baos = new ByteArrayOutputStream();
dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf, accumuloConfiguration);
- writer = new RFile.Writer(_cbw, blockSize, 1000);
+
+ SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration);
+ Sampler sampler = null;
+
+ if (samplerConfig != null) {
+ sampler = SamplerFactory.newSampler(samplerConfig, accumuloConfiguration);
+ }
+
+ writer = new RFile.Writer(_cbw, blockSize, 1000, samplerConfig, sampler);
if (startDLG)
writer.startDefaultLocalityGroup();
@@ -221,7 +261,6 @@ public class RFileTest {
}
public void openReader(boolean cfsi) throws IOException {
-
int fileLength = 0;
byte[] data = null;
data = baos.toByteArray();
@@ -1206,7 +1245,6 @@ public class RFileTest {
@Test
public void test14() throws IOException {
// test starting locality group after default locality group was started
-
TestRFile trf = new TestRFile(conf);
trf.openWriter(false);
@@ -1558,6 +1596,7 @@ public class RFileTest {
runVersionTest(3);
runVersionTest(4);
runVersionTest(6);
+ runVersionTest(7);
}
private void runVersionTest(int version) throws IOException {
@@ -1762,6 +1801,294 @@ public class RFileTest {
conf = null;
}
+ private Key nk(int r, int c) {
+ String row = String.format("r%06d", r);
+ switch (c) {
+ case 0:
+ return new Key(row, "user", "addr");
+ case 1:
+ return new Key(row, "user", "name");
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private Value nv(int r, int c) {
+ switch (c) {
+ case 0:
+ return new Value(("123" + r + " west st").getBytes());
+ case 1:
+ return new Value(("bob" + r).getBytes());
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private static void hash(Hasher hasher, Key key, Value val) {
+ hasher.putBytes(key.getRowData().toArray());
+ hasher.putBytes(key.getColumnFamilyData().toArray());
+ hasher.putBytes(key.getColumnQualifierData().toArray());
+ hasher.putBytes(key.getColumnVisibilityData().toArray());
+ hasher.putLong(key.getTimestamp());
+ hasher.putBoolean(key.isDeleted());
+ hasher.putBytes(val.get());
+ }
+
+ private static void add(TestRFile trf, Key key, Value val, Hasher dataHasher, List<Entry<Key,Value>> sample, Sampler sampler) throws IOException {
+ if (sampler.accept(key)) {
+ sample.add(new AbstractMap.SimpleImmutableEntry<Key,Value>(key, val));
+ }
+
+ hash(dataHasher, key, val);
+
+ trf.writer.append(key, val);
+ }
+
+ private List<Entry<Key,Value>> toList(SortedKeyValueIterator<Key,Value> sample) throws IOException {
+ ArrayList<Entry<Key,Value>> ret = new ArrayList<>();
+
+ while (sample.hasTop()) {
+ ret.add(new AbstractMap.SimpleImmutableEntry<Key,Value>(new Key(sample.getTopKey()), new Value(sample.getTopValue())));
+ sample.next();
+ }
+
+ return ret;
+ }
+
+ private void checkSample(SortedKeyValueIterator<Key,Value> sample, List<Entry<Key,Value>> sampleData) throws IOException {
+ checkSample(sample, sampleData, EMPTY_COL_FAMS, false);
+ }
+
+ private void checkSample(SortedKeyValueIterator<Key,Value> sample, List<Entry<Key,Value>> sampleData, Collection<ByteSequence> columnFamilies,
+ boolean inclusive) throws IOException {
+
+ sample.seek(new Range(), columnFamilies, inclusive);
+ Assert.assertEquals(sampleData, toList(sample));
+
+ Random rand = new Random();
+ long seed = rand.nextLong();
+ rand = new Random(seed);
+
+ // randomly seek sample iterator and verify
+ for (int i = 0; i < 33; i++) {
+ Key startKey = null;
+ boolean startInclusive = false;
+ int startIndex = 0;
+
+ Key endKey = null;
+ boolean endInclusive = false;
+ int endIndex = sampleData.size();
+
+ if (rand.nextBoolean()) {
+ startIndex = rand.nextInt(sampleData.size());
+ startKey = sampleData.get(startIndex).getKey();
+ startInclusive = rand.nextBoolean();
+ if (!startInclusive) {
+ startIndex++;
+ }
+ }
+
+ if (startIndex < endIndex && rand.nextBoolean()) {
+ endIndex -= rand.nextInt(endIndex - startIndex);
+ endKey = sampleData.get(endIndex - 1).getKey();
+ endInclusive = rand.nextBoolean();
+ if (!endInclusive) {
+ endIndex--;
+ }
+ } else if (startIndex == endIndex) {
+ endInclusive = rand.nextBoolean();
+ }
+
+ sample.seek(new Range(startKey, startInclusive, endKey, endInclusive), columnFamilies, inclusive);
+ Assert.assertEquals("seed: " + seed, sampleData.subList(startIndex, endIndex), toList(sample));
+ }
+ }
+
+ @Test
+ public void testSample() throws IOException {
+
+ int num = 10000;
+
+ for (int sampleBufferSize : new int[] {1 << 10, 1 << 20}) {
+ // force sample buffer to flush for smaller data
+ RFile.setSampleBufferSize(sampleBufferSize);
+
+ for (int modulus : new int[] {19, 103, 1019}) {
+ Hasher dataHasher = Hashing.md5().newHasher();
+ List<Entry<Key,Value>> sampleData = new ArrayList<Entry<Key,Value>>();
+
+ ConfigurationCopy sampleConf = new ConfigurationCopy(conf == null ? AccumuloConfiguration.getDefaultConfiguration() : conf);
+ sampleConf.set(Property.TABLE_SAMPLER, RowSampler.class.getName());
+ sampleConf.set(Property.TABLE_SAMPLER_OPTS + "hasher", "murmur3_32");
+ sampleConf.set(Property.TABLE_SAMPLER_OPTS + "modulus", modulus + "");
+
+ Sampler sampler = SamplerFactory.newSampler(SamplerConfigurationImpl.newSamplerConfig(sampleConf), sampleConf);
+
+ TestRFile trf = new TestRFile(sampleConf);
+
+ trf.openWriter();
+
+ for (int i = 0; i < num; i++) {
+ add(trf, nk(i, 0), nv(i, 0), dataHasher, sampleData, sampler);
+ add(trf, nk(i, 1), nv(i, 1), dataHasher, sampleData, sampler);
+ }
+
+ HashCode expectedDataHash = dataHasher.hash();
+
+ trf.closeWriter();
+
+ trf.openReader();
+
+ FileSKVIterator sample = trf.reader.getSample(SamplerConfigurationImpl.newSamplerConfig(sampleConf));
+
+ checkSample(sample, sampleData);
+
+ Assert.assertEquals(expectedDataHash, hash(trf.reader));
+
+ SampleIE ie = new SampleIE(SamplerConfigurationImpl.newSamplerConfig(sampleConf).toSamplerConfiguration());
+
+ for (int i = 0; i < 3; i++) {
+ // test opening and closing deep copies a few times.
+ trf.reader.closeDeepCopies();
+
+ sample = trf.reader.getSample(SamplerConfigurationImpl.newSamplerConfig(sampleConf));
+ SortedKeyValueIterator<Key,Value> sampleDC1 = sample.deepCopy(ie);
+ SortedKeyValueIterator<Key,Value> sampleDC2 = sample.deepCopy(ie);
+ SortedKeyValueIterator<Key,Value> sampleDC3 = trf.reader.deepCopy(ie);
+ SortedKeyValueIterator<Key,Value> allDC1 = sampleDC1.deepCopy(new SampleIE(null));
+ SortedKeyValueIterator<Key,Value> allDC2 = sample.deepCopy(new SampleIE(null));
+
+ Assert.assertEquals(expectedDataHash, hash(allDC1));
+ Assert.assertEquals(expectedDataHash, hash(allDC2));
+
+ checkSample(sample, sampleData);
+ checkSample(sampleDC1, sampleData);
+ checkSample(sampleDC2, sampleData);
+ checkSample(sampleDC3, sampleData);
+ }
+
+ trf.reader.closeDeepCopies();
+
+ trf.closeReader();
+ }
+ }
+ }
+
+ private HashCode hash(SortedKeyValueIterator<Key,Value> iter) throws IOException {
+ Hasher dataHasher = Hashing.md5().newHasher();
+ iter.seek(new Range(), EMPTY_COL_FAMS, false);
+ while (iter.hasTop()) {
+ hash(dataHasher, iter.getTopKey(), iter.getTopValue());
+ iter.next();
+ }
+
+ return dataHasher.hash();
+ }
+
+ @Test
+ public void testSampleLG() throws IOException {
+
+ int num = 5000;
+
+ for (int sampleBufferSize : new int[] {1 << 10, 1 << 20}) {
+ // force sample buffer to flush for smaller data
+ RFile.setSampleBufferSize(sampleBufferSize);
+
+ for (int modulus : new int[] {19, 103, 1019}) {
+ List<Entry<Key,Value>> sampleDataLG1 = new ArrayList<Entry<Key,Value>>();
+ List<Entry<Key,Value>> sampleDataLG2 = new ArrayList<Entry<Key,Value>>();
+
+ ConfigurationCopy sampleConf = new ConfigurationCopy(conf == null ? AccumuloConfiguration.getDefaultConfiguration() : conf);
+ sampleConf.set(Property.TABLE_SAMPLER, RowSampler.class.getName());
+ sampleConf.set(Property.TABLE_SAMPLER_OPTS + "hasher", "murmur3_32");
+ sampleConf.set(Property.TABLE_SAMPLER_OPTS + "modulus", modulus + "");
+
+ Sampler sampler = SamplerFactory.newSampler(SamplerConfigurationImpl.newSamplerConfig(sampleConf), sampleConf);
+
+ TestRFile trf = new TestRFile(sampleConf);
+
+ trf.openWriter(false, 1000);
+
+ trf.writer.startNewLocalityGroup("meta-lg", ncfs("metaA", "metaB"));
+ for (int r = 0; r < num; r++) {
+ String row = String.format("r%06d", r);
+ Key k1 = new Key(row, "metaA", "q9", 7);
+ Key k2 = new Key(row, "metaB", "q8", 7);
+ Key k3 = new Key(row, "metaB", "qA", 7);
+
+ Value v1 = new Value(("" + r).getBytes());
+ Value v2 = new Value(("" + r * 93).getBytes());
+ Value v3 = new Value(("" + r * 113).getBytes());
+
+ if (sampler.accept(k1)) {
+ sampleDataLG1.add(new AbstractMap.SimpleImmutableEntry<Key,Value>(k1, v1));
+ sampleDataLG1.add(new AbstractMap.SimpleImmutableEntry<Key,Value>(k2, v2));
+ sampleDataLG1.add(new AbstractMap.SimpleImmutableEntry<Key,Value>(k3, v3));
+ }
+
+ trf.writer.append(k1, v1);
+ trf.writer.append(k2, v2);
+ trf.writer.append(k3, v3);
+ }
+
+ trf.writer.startDefaultLocalityGroup();
+
+ for (int r = 0; r < num; r++) {
+ String row = String.format("r%06d", r);
+ Key k1 = new Key(row, "dataA", "q9", 7);
+
+ Value v1 = new Value(("" + r).getBytes());
+
+ if (sampler.accept(k1)) {
+ sampleDataLG2.add(new AbstractMap.SimpleImmutableEntry<Key,Value>(k1, v1));
+ }
+
+ trf.writer.append(k1, v1);
+ }
+
+ trf.closeWriter();
+
+ Assert.assertTrue(sampleDataLG1.size() > 0);
+ Assert.assertTrue(sampleDataLG2.size() > 0);
+
+ trf.openReader(false);
+ FileSKVIterator sample = trf.reader.getSample(SamplerConfigurationImpl.newSamplerConfig(sampleConf));
+
+ checkSample(sample, sampleDataLG1, ncfs("metaA", "metaB"), true);
+ checkSample(sample, sampleDataLG1, ncfs("metaA"), true);
+ checkSample(sample, sampleDataLG1, ncfs("metaB"), true);
+ checkSample(sample, sampleDataLG1, ncfs("dataA"), false);
+
+ checkSample(sample, sampleDataLG2, ncfs("metaA", "metaB"), false);
+ checkSample(sample, sampleDataLG2, ncfs("dataA"), true);
+
+ ArrayList<Entry<Key,Value>> allSampleData = new ArrayList<Entry<Key,Value>>();
+ allSampleData.addAll(sampleDataLG1);
+ allSampleData.addAll(sampleDataLG2);
+
+ Collections.sort(allSampleData, new Comparator<Entry<Key,Value>>() {
+ @Override
+ public int compare(Entry<Key,Value> o1, Entry<Key,Value> o2) {
+ return o1.getKey().compareTo(o2.getKey());
+ }
+ });
+
+ checkSample(sample, allSampleData, ncfs("dataA", "metaA"), true);
+ checkSample(sample, allSampleData, EMPTY_COL_FAMS, false);
+
+ trf.closeReader();
+ }
+ }
+ }
+
+ @Test
+ public void testEncSample() throws IOException {
+ conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
+ testSample();
+ testSampleLG();
+ conf = null;
+ }
+
@Test
public void testCryptoDoesntLeakSensitive() throws IOException {
conf = setAndGetAccumuloConfig(CryptoTest.CRYPTO_ON_CONF);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java b/core/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java
index 316823c..3c68196 100644
--- a/core/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java
@@ -18,17 +18,16 @@ package org.apache.accumulo.core.iterators;
import java.io.IOException;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.system.MapFileIterator;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-public class DefaultIteratorEnvironment implements IteratorEnvironment {
+public class DefaultIteratorEnvironment extends BaseIteratorEnvironment {
AccumuloConfiguration conf;
@@ -53,23 +52,7 @@ public class DefaultIteratorEnvironment implements IteratorEnvironment {
}
@Override
- public IteratorScope getIteratorScope() {
- throw new UnsupportedOperationException();
+ public boolean isSamplingEnabled() {
+ return false;
}
-
- @Override
- public boolean isFullMajorCompaction() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Authorizations getAuthorizations() {
- throw new UnsupportedOperationException();
- }
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowIteratorTest.java
index 74f7462..5455aa6 100644
--- a/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowIteratorTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowIteratorTest.java
@@ -22,14 +22,12 @@ import java.io.IOException;
import java.util.Collections;
import java.util.TreeMap;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.system.CountingIterator;
-import org.apache.accumulo.core.security.Authorizations;
import org.junit.Test;
public class FirstEntryInRowIteratorTest {
@@ -39,38 +37,7 @@ public class FirstEntryInRowIteratorTest {
org.apache.accumulo.core.iterators.SortedMapIterator source = new SortedMapIterator(sourceMap);
CountingIterator counter = new CountingIterator(source);
FirstEntryInRowIterator feiri = new FirstEntryInRowIterator();
- IteratorEnvironment env = new IteratorEnvironment() {
-
- @Override
- public AccumuloConfiguration getConfig() {
- return null;
- }
-
- @Override
- public IteratorScope getIteratorScope() {
- return null;
- }
-
- @Override
- public boolean isFullMajorCompaction() {
- return false;
- }
-
- @Override
- public void registerSideChannel(SortedKeyValueIterator<Key,Value> arg0) {
-
- }
-
- @Override
- public Authorizations getAuthorizations() {
- return null;
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String arg0) throws IOException {
- return null;
- }
- };
+ IteratorEnvironment env = new BaseIteratorEnvironment();
feiri.init(counter, Collections.singletonMap(FirstEntryInRowIterator.NUM_SCANS_STRING_NAME, Integer.toString(numScans)), env);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/iterators/SortedMapIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/SortedMapIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/SortedMapIteratorTest.java
new file mode 100644
index 0000000..7557b9a
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/SortedMapIteratorTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators;
+
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.junit.Test;
+
+public class SortedMapIteratorTest {
+
+ @Test(expected = SampleNotPresentException.class)
+ public void testSampleNotPresent() {
+ SortedMapIterator smi = new SortedMapIterator(new TreeMap<Key,Value>());
+ smi.deepCopy(new BaseIteratorEnvironment() {
+ @Override
+ public boolean isSamplingEnabled() {
+ return true;
+ }
+
+ @Override
+ public SamplerConfiguration getSamplerConfiguration() {
+ return new SamplerConfiguration(RowSampler.class.getName());
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/iterators/user/RowDeletingIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/RowDeletingIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/user/RowDeletingIteratorTest.java
index a3c1cca..bdaf112 100644
--- a/core/src/test/java/org/apache/accumulo/core/iterators/user/RowDeletingIteratorTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/RowDeletingIteratorTest.java
@@ -16,30 +16,26 @@
*/
package org.apache.accumulo.core.iterators.user;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.TreeMap;
-import junit.framework.TestCase;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.SortedMapIterator;
import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
+import junit.framework.TestCase;
+
public class RowDeletingIteratorTest extends TestCase {
- public static class TestIE implements IteratorEnvironment {
+ public static class TestIE extends BaseIteratorEnvironment {
private IteratorScope scope;
private boolean fmc;
@@ -50,11 +46,6 @@ public class RowDeletingIteratorTest extends TestCase {
}
@Override
- public AccumuloConfiguration getConfig() {
- return null;
- }
-
- @Override
public IteratorScope getIteratorScope() {
return scope;
}
@@ -63,19 +54,6 @@ public class RowDeletingIteratorTest extends TestCase {
public boolean isFullMajorCompaction() {
return fmc;
}
-
- @Override
- public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
- return null;
- }
-
- @Override
- public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {}
-
- @Override
- public Authorizations getAuthorizations() {
- return null;
- }
}
Key nk(String row, String cf, String cq, long time) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
index 8f228f5..d9aa174 100644
--- a/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
@@ -16,26 +16,15 @@
*/
package org.apache.accumulo.core.iterators.user;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.IteratorUtil;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.SortedMapIterator;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.commons.collections.BufferOverflowException;
-import org.apache.hadoop.io.Text;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -43,23 +32,20 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.commons.collections.BufferOverflowException;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
public class RowEncodingIteratorTest {
- private static final class DummyIteratorEnv implements IteratorEnvironment {
- @Override
- public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
- return null;
- }
-
- @Override
- public AccumuloConfiguration getConfig() {
- return null;
- }
-
+ private static final class DummyIteratorEnv extends BaseIteratorEnvironment {
@Override
public IteratorUtil.IteratorScope getIteratorScope() {
return IteratorUtil.IteratorScope.scan;
@@ -69,16 +55,6 @@ public class RowEncodingIteratorTest {
public boolean isFullMajorCompaction() {
return false;
}
-
- @Override
- public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
-
- }
-
- @Override
- public Authorizations getAuthorizations() {
- return null;
- }
}
private static final class RowEncodingIteratorImpl extends RowEncodingIterator {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java
index 1f4d6e7..97ebe5c 100644
--- a/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java
@@ -34,7 +34,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
@@ -599,7 +599,7 @@ public class TransformingIteratorTest {
public static class ColFamReversingCompactionKeyTransformingIterator extends ColFamReversingKeyTransformingIterator {
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- env = new MajCIteratorEnvironmentAdapter(env);
+ env = new MajCIteratorEnvironmentAdapter();
super.init(source, options, env);
}
}
@@ -639,7 +639,7 @@ public class TransformingIteratorTest {
public static class IllegalVisCompactionKeyTransformingIterator extends IllegalVisKeyTransformingIterator {
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- env = new MajCIteratorEnvironmentAdapter(env);
+ env = new MajCIteratorEnvironmentAdapter();
super.init(source, options, env);
}
}
@@ -665,7 +665,7 @@ public class TransformingIteratorTest {
public static class BadVisCompactionKeyTransformingIterator extends BadVisKeyTransformingIterator {
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- env = new MajCIteratorEnvironmentAdapter(env);
+ env = new MajCIteratorEnvironmentAdapter();
super.init(source, options, env);
}
}
@@ -711,41 +711,10 @@ public class TransformingIteratorTest {
}
}
- private static class MajCIteratorEnvironmentAdapter implements IteratorEnvironment {
- private IteratorEnvironment delegate;
-
- public MajCIteratorEnvironmentAdapter(IteratorEnvironment delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
- return delegate.reserveMapFileReader(mapFileName);
- }
-
- @Override
- public AccumuloConfiguration getConfig() {
- return delegate.getConfig();
- }
-
+ private static class MajCIteratorEnvironmentAdapter extends BaseIteratorEnvironment {
@Override
public IteratorScope getIteratorScope() {
return IteratorScope.majc;
}
-
- @Override
- public boolean isFullMajorCompaction() {
- return delegate.isFullMajorCompaction();
- }
-
- @Override
- public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
- delegate.registerSideChannel(iter);
- }
-
- @Override
- public Authorizations getAuthorizations() {
- return null;
- }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/core/src/test/resources/org/apache/accumulo/core/file/rfile/ver_7.rf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/org/apache/accumulo/core/file/rfile/ver_7.rf b/core/src/test/resources/org/apache/accumulo/core/file/rfile/ver_7.rf
new file mode 100644
index 0000000..7d2c9f7
Binary files /dev/null and b/core/src/test/resources/org/apache/accumulo/core/file/rfile/ver_7.rf differ
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
index 32f19fe..b62983a 100644
--- a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
+++ b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
@@ -59,6 +59,8 @@ include::chapters/ssl.txt[]
include::chapters/kerberos.txt[]
+include::chapters/sampling.txt[]
+
include::chapters/administration.txt[]
include::chapters/multivolume.txt[]
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/docs/src/main/asciidoc/chapters/sampling.txt
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/chapters/sampling.txt b/docs/src/main/asciidoc/chapters/sampling.txt
new file mode 100644
index 0000000..f035c56
--- /dev/null
+++ b/docs/src/main/asciidoc/chapters/sampling.txt
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+== Sampling
+
+=== Overview
+
+Accumulo has the ability to generate and scan a per table set of sample data.
+This sample data is kept up to date as a table is mutated. What key values are
+placed in the sample data is configurable per table.
+
+This feature can be used for query estimation and optimization. For an example
+of estimaiton assume an Accumulo table is configured to generate a sample
+containing one millionth of a tables data. If a query is executed against the
+sample and returns one thousand results, then the same query against all the
+data would probably return a billion results. A nice property of having
+Accumulo generate the sample is that its always up to date. So estimations
+will be accurate even when querying the most recently written data.
+
+An example of a query optimization is an iterator using sample data to get an
+estimate, and then making decisions based on the estimate.
+
+=== Configuring
+
+Inorder to use sampling, an Accumulo table must be configured with a class that
+implements +org.apache.accumulo.core.sample.Sampler+ along with options for
+that class. For guidance on implementing a Sampler see that interface's
+javadoc. Accumulo provides a few implementations out of the box. For
+information on how to use the samplers that ship with Accumulo look in the
+package `org.apache.accumulo.core.sample` and consult the javadoc of the
+classes there. See +README.sample+ and +SampleExample.java+ for examples of
+how to configure a Sampler on a table.
+
+Once a table is configured with a sampler all writes after that point will
+generate sample data. For data written before sampling was configured sample
+data will not be present. A compaction can be initiated that only compacts the
+files in the table that do not have sample data. The example readme shows how
+to do this.
+
+If the sampling configuration of a table is changed, then Accumulo will start
+generating new sample data with the new configuration. However old data will
+still have sample data generated with the previous configuration. A selective
+compaction can also be issued in this case to regenerate the sample data.
+
+=== Scanning sample data
+
+Inorder to scan sample data, use the +setSamplerConfiguration(...)+ method on
++Scanner+ or +BatchScanner+. Please consult this methods javadocs for more
+information.
+
+Sample data can also be scanned from within an Accumulo
++SortedKeyValueIterator+. To see how to do this look at the example iterator
+referenced in README.sample. Also, consult the javadoc on
++org.apache.accumulo.core.iterators.IteratorEnvironment.cloneWithSamplingEnabled()+.
+
+Map reduce jobs using the +AccumuloInputFormat+ can also read sample data. See
+the javadoc for the +setSamplerConfiguration()+ method on
++AccumuloInputFormat+.
+
+Scans over sample data will throw a +SampleNotPresentException+ in the following cases :
+
+. sample data is not present,
+. sample data is present but was generated with multiple configurations
+. sample data is partially present
+
+So a scan over sample data can only succeed if all data written has sample data
+generated with the same configuration.
+
+=== Bulk import
+
+When generating rfiles to bulk import into Accumulo, those rfiles can contain
+sample data. To use this feature, look at the javadoc on the
++AccumuloFileOutputFormat.setSampler(...)+ method.
+
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/docs/src/main/resources/examples/README
----------------------------------------------------------------------
diff --git a/docs/src/main/resources/examples/README b/docs/src/main/resources/examples/README
index 4211050..03c2e05 100644
--- a/docs/src/main/resources/examples/README
+++ b/docs/src/main/resources/examples/README
@@ -80,6 +80,8 @@ features of Apache Accumulo.
README.rowhash: Using MapReduce to read a table and write to a new
column in the same table.
+ README.sample: Building and using sample data in Accumulo.
+
README.shard: Using the intersecting iterator with a term index
partitioned by document.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/docs/src/main/resources/examples/README.sample
----------------------------------------------------------------------
diff --git a/docs/src/main/resources/examples/README.sample b/docs/src/main/resources/examples/README.sample
new file mode 100644
index 0000000..15288aa
--- /dev/null
+++ b/docs/src/main/resources/examples/README.sample
@@ -0,0 +1,192 @@
+Title: Apache Accumulo Batch Writing and Scanning Example
+Notice: Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ .
+ http://www.apache.org/licenses/LICENSE-2.0
+ .
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Basic Sampling Example
+----------------------
+
+Accumulo supports building a set of sample data that can be efficiently
+accessed by scanners. What data is included in the sample set is configurable.
+Below, some data representing documents are inserted.
+
+ root@instance sampex> createtable sampex
+ root@instance sampex> insert 9255 doc content 'abcde'
+ root@instance sampex> insert 9255 doc url file://foo.txt
+ root@instance sampex> insert 8934 doc content 'accumulo scales'
+ root@instance sampex> insert 8934 doc url file://accumulo_notes.txt
+ root@instance sampex> insert 2317 doc content 'milk, eggs, bread, parmigiano-reggiano'
+ root@instance sampex> insert 2317 doc url file://groceries/9.txt
+ root@instance sampex> insert 3900 doc content 'EC2 ate my homework'
+ root@instance sampex> insert 3900 doc uril file://final_project.txt
+
+Below the table sampex is configured to build a sample set. The configuration
+causes Accumulo to include any row where `murmur3_32(row) % 3 ==0` in the
+tables sample data.
+
+ root@instance sampex> config -t sampex -s table.sampler.opt.hasher=murmur3_32
+ root@instance sampex> config -t sampex -s table.sampler.opt.modulus=3
+ root@instance sampex> config -t sampex -s table.sampler=org.apache.accumulo.core.sample.RowSampler
+
+Below, attempting to scan the sample returns an error. This is because data
+was inserted before the sample set was configured.
+
+ root@instance sampex> scan --sample
+ 2015-09-09 12:21:50,643 [shell.Shell] ERROR: org.apache.accumulo.core.client.SampleNotPresentException: Table sampex(ID:2) does not have sampling configured or built
+
+To remedy this problem, the following command will flush in memory data and
+compact any files that do not contain the correct sample data.
+
+ root@instance sampex> compact -t sampex --sf-no-sample
+
+After the compaction, the sample scan works.
+
+ root@instance sampex> scan --sample
+ 2317 doc:content [] milk, eggs, bread, parmigiano-reggiano
+ 2317 doc:url [] file://groceries/9.txt
+
+The commands below show that updates to data in the sample are seen when
+scanning the sample.
+
+ root@instance sampex> insert 2317 doc content 'milk, eggs, bread, parmigiano-reggiano, butter'
+ root@instance sampex> scan --sample
+ 2317 doc:content [] milk, eggs, bread, parmigiano-reggiano, butter
+ 2317 doc:url [] file://groceries/9.txt
+
+Inorder to make scanning the sample fast, sample data is partitioned as data is
+written to Accumulo. This means if the sample configuration is changed, that
+data written previously is partitioned using a different criteria. Accumulo
+will detect this situation and fail sample scans. The commands below show this
+failure and fixiing the problem with a compaction.
+
+ root@instance sampex> config -t sampex -s table.sampler.opt.modulus=2
+ root@instance sampex> scan --sample
+ 2015-09-09 12:22:51,058 [shell.Shell] ERROR: org.apache.accumulo.core.client.SampleNotPresentException: Table sampex(ID:2) does not have sampling configured or built
+ root@instance sampex> compact -t sampex --sf-no-sample
+ 2015-09-09 12:23:07,242 [shell.Shell] INFO : Compaction of table sampex started for given range
+ root@instance sampex> scan --sample
+ 2317 doc:content [] milk, eggs, bread, parmigiano-reggiano
+ 2317 doc:url [] file://groceries/9.txt
+ 3900 doc:content [] EC2 ate my homework
+ 3900 doc:uril [] file://final_project.txt
+ 9255 doc:content [] abcde
+ 9255 doc:url [] file://foo.txt
+
+The example above is replicated in a java program using the Accumulo API.
+Below is the program name and the command to run it.
+
+ ./bin/accumulo org.apache.accumulo.examples.simple.sample.SampleExample -i instance -z localhost -u root -p secret
+
+The commands below look under the hood to give some insight into how this
+feature works. The commands determine what files the sampex table is using.
+
+ root@instance sampex> tables -l
+ accumulo.metadata => !0
+ accumulo.replication => +rep
+ accumulo.root => +r
+ sampex => 2
+ trace => 1
+ root@instance sampex> scan -t accumulo.metadata -c file -b 2 -e 2<
+ 2< file:hdfs://localhost:10000/accumulo/tables/2/default_tablet/A000000s.rf [] 702,8
+
+Below shows running `accumulo rfile-info` on the file above. This shows the
+rfile has a normal default locality group and a sample default locality group.
+The output also shows the configuration used to create the sample locality
+group. The sample configuration within a rfile must match the tables sample
+configuration for sample scan to work.
+
+ $ ./bin/accumulo rfile-info hdfs://localhost:10000/accumulo/tables/2/default_tablet/A000000s.rf
+ Reading file: hdfs://localhost:10000/accumulo/tables/2/default_tablet/A000000s.rf
+ RFile Version : 8
+
+ Locality group : <DEFAULT>
+ Start block : 0
+ Num blocks : 1
+ Index level 0 : 35 bytes 1 blocks
+ First key : 2317 doc:content [] 1437672014986 false
+ Last key : 9255 doc:url [] 1437672014875 false
+ Num entries : 8
+ Column families : [doc]
+
+ Sample Configuration :
+ Sampler class : org.apache.accumulo.core.sample.RowSampler
+ Sampler options : {hasher=murmur3_32, modulus=2}
+
+ Sample Locality group : <DEFAULT>
+ Start block : 0
+ Num blocks : 1
+ Index level 0 : 36 bytes 1 blocks
+ First key : 2317 doc:content [] 1437672014986 false
+ Last key : 9255 doc:url [] 1437672014875 false
+ Num entries : 6
+ Column families : [doc]
+
+ Meta block : BCFile.index
+ Raw size : 4 bytes
+ Compressed size : 12 bytes
+ Compression type : gz
+
+ Meta block : RFile.index
+ Raw size : 309 bytes
+ Compressed size : 176 bytes
+ Compression type : gz
+
+
+Shard Sampling Example
+-------------------------
+
+`README.shard` shows how to index and search files using Accumulo. That
+example indexes documents into a table named `shard`. The indexing scheme used
+in that example places the document name in the column qualifier. A useful
+sample of this indexing scheme should contain all data for any document in the
+sample. To accomplish this, the following commands build a sample for the
+shard table based on the column qualifier.
+
+ root@instance shard> config -t shard -s table.sampler.opt.hasher=murmur3_32
+ root@instance shard> config -t shard -s table.sampler.opt.modulus=101
+ root@instance shard> config -t shard -s table.sampler.opt.qualifier=true
+ root@instance shard> config -t shard -s table.sampler=org.apache.accumulo.core.sample.RowColumnSampler
+ root@instance shard> compact -t shard --sf-no-sample -w
+ 2015-07-23 15:00:09,280 [shell.Shell] INFO : Compacting table ...
+ 2015-07-23 15:00:10,134 [shell.Shell] INFO : Compaction of table shard completed for given range
+
+After enabling sampling, the command below counts the number of documents in
+the sample containing the words `import` and `int`.
+
+ $ ./bin/accumulo org.apache.accumulo.examples.simple.shard.Query --sample -i instance16 -z localhost -t shard -u root -p secret import int | fgrep '.java' | wc
+ 11 11 1246
+
+The command below counts the total number of documents containing the words
+`import` and `int`.
+
+ $ ./bin/accumulo org.apache.accumulo.examples.simple.shard.Query -i instance16 -z localhost -t shard -u root -p secret import int | fgrep '.java' | wc
+ 1085 1085 118175
+
+The counts 11 out of 1085 total are around what would be expected for a modulus
+of 101. Querying the sample first provides a quick way to estimate how much data
+the real query will bring back.
+
+Another way sample data could be used with the shard example is with a
+specialized iterator. In the examples source code there is an iterator named
+CutoffIntersectingIterator. This iterator first checks how many documents are
+found in the sample data. If too many documents are found in the sample data,
+then it returns nothing. Otherwise it proceeds to query the full data set.
+To experiment with this iterator, use the following command. The
+`--sampleCutoff` option below will cause the query to return nothing if based
+on the sample it appears a query would return more than 1000 documents.
+
+ $ ./bin/accumulo org.apache.accumulo.examples.simple.shard.Query --sampleCutoff 1000 -i instance16 -z localhost -t shard -u root -p secret import int | fgrep '.java' | wc
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/examples/simple/src/main/java/org/apache/accumulo/examples/simple/sample/SampleExample.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/sample/SampleExample.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/sample/SampleExample.java
new file mode 100644
index 0000000..57d77b1
--- /dev/null
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/sample/SampleExample.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.examples.simple.sample;
+
+import java.util.Collections;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.examples.simple.client.RandomBatchWriter;
+import org.apache.accumulo.examples.simple.shard.CutoffIntersectingIterator;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * A simple example of using Accumulo's sampling feature. This example does something similar to what README.sample shows using the shell. Also see
+ * {@link CutoffIntersectingIterator} and README.sample for an example of how to use sample data from within an iterator.
+ */
+public class SampleExample {
+
+ // a compaction strategy that only selects files for compaction that have no sample data or sample data created in a different way than the tables
+ static final CompactionStrategyConfig NO_SAMPLE_STRATEGY = new CompactionStrategyConfig(
+ "org.apache.accumulo.tserver.compaction.strategies.ConfigurableCompactionStrategy").setOptions(Collections.singletonMap("SF_NO_SAMPLE", ""));
+
+ static class Opts extends ClientOnDefaultTable {
+ public Opts() {
+ super("sampex");
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.parseArgs(RandomBatchWriter.class.getName(), args, bwOpts);
+
+ Connector conn = opts.getConnector();
+
+ if (!conn.tableOperations().exists(opts.getTableName())) {
+ conn.tableOperations().create(opts.getTableName());
+ } else {
+ System.out.println("Table exists, not doing anything.");
+ return;
+ }
+
+ // write some data
+ BatchWriter bw = conn.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
+ bw.addMutation(createMutation("9225", "abcde", "file://foo.txt"));
+ bw.addMutation(createMutation("8934", "accumulo scales", "file://accumulo_notes.txt"));
+ bw.addMutation(createMutation("2317", "milk, eggs, bread, parmigiano-reggiano", "file://groceries/9/txt"));
+ bw.addMutation(createMutation("3900", "EC2 ate my homework", "file://final_project.txt"));
+ bw.flush();
+
+ SamplerConfiguration sc1 = new SamplerConfiguration(RowSampler.class.getName());
+ sc1.setOptions(ImmutableMap.of("hasher", "murmur3_32", "modulus", "3"));
+
+ conn.tableOperations().setSamplerConfiguration(opts.getTableName(), sc1);
+
+ Scanner scanner = conn.createScanner(opts.getTableName(), Authorizations.EMPTY);
+ System.out.println("Scanning all data :");
+ print(scanner);
+ System.out.println();
+
+ System.out.println("Scanning with sampler configuration. Data was written before sampler was set on table, scan should fail.");
+ scanner.setSamplerConfiguration(sc1);
+ try {
+ print(scanner);
+ } catch (SampleNotPresentException e) {
+ System.out.println(" Saw sample not present exception as expected.");
+ }
+ System.out.println();
+
+ // compact table to recreate sample data
+ conn.tableOperations().compact(opts.getTableName(), new CompactionConfig().setCompactionStrategy(NO_SAMPLE_STRATEGY));
+
+ System.out.println("Scanning after compaction (compaction should have created sample data) : ");
+ print(scanner);
+ System.out.println();
+
+ // update a document in the sample data
+ bw.addMutation(createMutation("2317", "milk, eggs, bread, parmigiano-reggiano, butter", "file://groceries/9/txt"));
+ bw.close();
+ System.out.println("Scanning sample after updating content for docId 2317 (should see content change in sample data) : ");
+ print(scanner);
+ System.out.println();
+
+ // change tables sampling configuration...
+ SamplerConfiguration sc2 = new SamplerConfiguration(RowSampler.class.getName());
+ sc2.setOptions(ImmutableMap.of("hasher", "murmur3_32", "modulus", "2"));
+ conn.tableOperations().setSamplerConfiguration(opts.getTableName(), sc2);
+ // compact table to recreate sample data using new configuration
+ conn.tableOperations().compact(opts.getTableName(), new CompactionConfig().setCompactionStrategy(NO_SAMPLE_STRATEGY));
+
+ System.out.println("Scanning with old sampler configuration. Sample data was created using new configuration with a compaction. Scan should fail.");
+ try {
+ // try scanning with old sampler configuration
+ print(scanner);
+ } catch (SampleNotPresentException e) {
+ System.out.println(" Saw sample not present exception as expected ");
+ }
+ System.out.println();
+
+ // update expected sampler configuration on scanner
+ scanner.setSamplerConfiguration(sc2);
+
+ System.out.println("Scanning with new sampler configuration : ");
+ print(scanner);
+ System.out.println();
+
+ }
+
+ private static void print(Scanner scanner) {
+ for (Entry<Key,Value> entry : scanner) {
+ System.out.println(" " + entry.getKey() + " " + entry.getValue());
+ }
+ }
+
+ private static Mutation createMutation(String docId, String content, String url) {
+ Mutation m = new Mutation(docId);
+ m.put("doc", "context", content);
+ m.put("doc", "url", url);
+ return m;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/CutoffIntersectingIterator.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/CutoffIntersectingIterator.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/CutoffIntersectingIterator.java
new file mode 100644
index 0000000..133e8ae
--- /dev/null
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/CutoffIntersectingIterator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.examples.simple.shard;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator;
+import org.apache.accumulo.core.sample.RowColumnSampler;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This iterator uses a sample built from the Column Qualifier to quickly avoid intersecting iterator queries that may return too many documents.
+ */
+
+public class CutoffIntersectingIterator extends IntersectingIterator {
+
+ private IntersectingIterator sampleII;
+ private int sampleMax;
+ private boolean hasTop;
+
+ public static void setCutoff(IteratorSetting iterCfg, int cutoff) {
+ Preconditions.checkArgument(cutoff >= 0);
+ iterCfg.addOption("cutoff", cutoff + "");
+ }
+
+ @Override
+ public boolean hasTop() {
+ return hasTop && super.hasTop();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
+
+ sampleII.seek(range, seekColumnFamilies, inclusive);
+
+ // this check will be redone whenever iterator stack is torn down and recreated.
+ int count = 0;
+ while (count <= sampleMax && sampleII.hasTop()) {
+ sampleII.next();
+ count++;
+ }
+
+ if (count > sampleMax) {
+ // In a real application would probably want to return a key value that indicates too much data. Since this would execute for each tablet, some tablets
+ // may return data. For tablets that did not return data, would want an indication.
+ hasTop = false;
+ } else {
+ hasTop = true;
+ super.seek(range, seekColumnFamilies, inclusive);
+ }
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+
+ IteratorEnvironment sampleEnv = env.cloneWithSamplingEnabled();
+
+ setMax(sampleEnv, options);
+
+ SortedKeyValueIterator<Key,Value> sampleDC = source.deepCopy(sampleEnv);
+ sampleII = new IntersectingIterator();
+ sampleII.init(sampleDC, options, env);
+
+ }
+
+ static void validateSamplerConfig(SamplerConfiguration sampleConfig) {
+ Preconditions.checkNotNull(sampleConfig);
+ Preconditions.checkArgument(sampleConfig.getSamplerClassName().equals(RowColumnSampler.class.getName()),
+ "Unexpected Sampler " + sampleConfig.getSamplerClassName());
+ Preconditions.checkArgument(sampleConfig.getOptions().get("qualifier").equals("true"), "Expected sample on column qualifier");
+ Preconditions.checkArgument(isNullOrFalse(sampleConfig.getOptions(), "row", "family", "visibility"), "Expected sample on column qualifier only");
+ }
+
+ private void setMax(IteratorEnvironment sampleEnv, Map<String,String> options) {
+ String cutoffValue = options.get("cutoff");
+ SamplerConfiguration sampleConfig = sampleEnv.getSamplerConfiguration();
+
+ // Ensure the sample was constructed in an expected way. If the sample is not built as expected, then can not draw conclusions based on sample.
+ Preconditions.checkNotNull(cutoffValue, "Expected cutoff option is missing");
+ validateSamplerConfig(sampleConfig);
+
+ int modulus = Integer.parseInt(sampleConfig.getOptions().get("modulus"));
+
+ sampleMax = Math.round(Float.parseFloat(cutoffValue) / modulus);
+ }
+
+ private static boolean isNullOrFalse(Map<String,String> options, String... keys) {
+ for (String key : keys) {
+ String val = options.get(key);
+ if (val != null && val.equals("true")) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Query.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Query.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Query.java
index 41d5dc7..7925855 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Query.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Query.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
@@ -46,16 +47,32 @@ public class Query {
static class Opts extends ClientOnRequiredTable {
@Parameter(description = " term { <term> ... }")
List<String> terms = new ArrayList<String>();
+
+ @Parameter(names = {"--sample"}, description = "Do queries against sample, useful when sample is built using column qualifier")
+ private boolean useSample = false;
+
+ @Parameter(names = {"--sampleCutoff"},
+ description = "Use sample data to determine if a query might return a number of documents over the cutoff. This check is per tablet.")
+ private Integer sampleCutoff = null;
}
- public static List<String> query(BatchScanner bs, List<String> terms) {
+ public static List<String> query(BatchScanner bs, List<String> terms, Integer cutoff) {
Text columns[] = new Text[terms.size()];
int i = 0;
for (String term : terms) {
columns[i++] = new Text(term);
}
- IteratorSetting ii = new IteratorSetting(20, "ii", IntersectingIterator.class);
+
+ IteratorSetting ii;
+
+ if (cutoff != null) {
+ ii = new IteratorSetting(20, "ii", CutoffIntersectingIterator.class);
+ CutoffIntersectingIterator.setCutoff(ii, cutoff);
+ } else {
+ ii = new IteratorSetting(20, "ii", IntersectingIterator.class);
+ }
+
IntersectingIterator.setColumnFamilies(ii, columns);
bs.addScanIterator(ii);
bs.setRanges(Collections.singleton(new Range()));
@@ -73,9 +90,15 @@ public class Query {
Connector conn = opts.getConnector();
BatchScanner bs = conn.createBatchScanner(opts.getTableName(), opts.auths, bsOpts.scanThreads);
bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);
-
- for (String entry : query(bs, opts.terms))
+ if (opts.useSample) {
+ SamplerConfiguration samplerConfig = conn.tableOperations().getSamplerConfiguration(opts.getTableName());
+ CutoffIntersectingIterator.validateSamplerConfig(conn.tableOperations().getSamplerConfiguration(opts.getTableName()));
+ bs.setSamplerConfiguration(samplerConfig);
+ }
+ for (String entry : query(bs, opts.terms, opts.sampleCutoff))
System.out.println(" " + entry);
+
+ bs.close();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
index 0d7ade8..d2d6664 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
@@ -189,7 +189,7 @@ public class VerifyTabletAssignments {
List<IterInfo> emptyListIterInfo = Collections.emptyList();
List<TColumn> emptyListColumn = Collections.emptyList();
InitialMultiScan is = client.startMultiScan(tinfo, context.rpcCreds(), batch, emptyListColumn, emptyListIterInfo, emptyMapSMapSS,
- Authorizations.EMPTY.getAuthorizationsBB(), false, 0L);
+ Authorizations.EMPTY.getAuthorizationsBB(), false, null, 0L);
if (is.result.more) {
MultiScanResult result = client.continueMultiScan(tinfo, is.scanID);
checkFailures(entry.getKey(), failures, result);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java b/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java
index 7e9543f..1b30530 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java
@@ -21,18 +21,15 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.TreeMap;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.SortedMapIterator;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
import org.apache.hadoop.io.Text;
@@ -104,20 +101,7 @@ public class MetadataBulkLoadFilterTest {
put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/fileA", "2");
TestMetadataBulkLoadFilter iter = new TestMetadataBulkLoadFilter();
- iter.init(new SortedMapIterator(tm1), new HashMap<String,String>(), new IteratorEnvironment() {
-
- @Override
- public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
- return null;
- }
-
- @Override
- public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {}
-
- @Override
- public Authorizations getAuthorizations() {
- return null;
- }
+ iter.init(new SortedMapIterator(tm1), new HashMap<String,String>(), new BaseIteratorEnvironment() {
@Override
public boolean isFullMajorCompaction() {
@@ -128,11 +112,6 @@ public class MetadataBulkLoadFilterTest {
public IteratorScope getIteratorScope() {
return IteratorScope.majc;
}
-
- @Override
- public AccumuloConfiguration getConfig() {
- return null;
- }
});
iter.seek(new Range(), new ArrayList<ByteSequence>(), false);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
index f4d5a9b..26ad8de 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
@@ -24,16 +24,10 @@ import java.util.List;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.IteratorSetting.Column;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.Combiner;
import org.apache.accumulo.core.iterators.DevNull;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.junit.Assert;
import org.junit.Before;
@@ -52,38 +46,7 @@ public class StatusCombinerTest {
builder = Status.newBuilder();
IteratorSetting cfg = new IteratorSetting(50, StatusCombiner.class);
Combiner.setColumns(cfg, Collections.singletonList(new Column(StatusSection.NAME)));
- combiner.init(new DevNull(), cfg.getOptions(), new IteratorEnvironment() {
-
- @Override
- public AccumuloConfiguration getConfig() {
- return null;
- }
-
- @Override
- public IteratorScope getIteratorScope() {
- return null;
- }
-
- @Override
- public boolean isFullMajorCompaction() {
- return false;
- }
-
- @Override
- public void registerSideChannel(SortedKeyValueIterator<Key,Value> arg0) {
-
- }
-
- @Override
- public Authorizations getAuthorizations() {
- return null;
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String arg0) throws IOException {
- return null;
- }
- });
+ combiner.init(new DevNull(), cfg.getOptions(), null);
}
@Test
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
index 750ad8e..2c46835 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.IteratorSetting.Column;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
@@ -134,4 +135,14 @@ public class NullScanner implements Scanner {
return 0;
}
+ @Override
+ public void setSamplerConfiguration(SamplerConfiguration samplerConfig) {}
+
+ @Override
+ public SamplerConfiguration getSamplerConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void clearSamplerConfiguration() {}
}