You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2015/09/21 15:51:27 UTC
[2/7] accumulo git commit: ACCUMULO-3913 Added per table sampling
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index 1c4676e..2227b25 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -43,6 +44,7 @@ import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
@@ -458,7 +460,6 @@ public class FileManager {
this.iflag = flag;
((InterruptibleIterator) this.iter).setInterruptFlag(iflag);
}
-
}
public class ScanFileManager {
@@ -502,7 +503,8 @@ public class FileManager {
return newlyReservedReaders;
}
- public synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
+ public synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable, SamplerConfigurationImpl samplerConfig)
+ throws IOException {
List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet());
@@ -511,13 +513,22 @@ public class FileManager {
for (FileSKVIterator reader : newlyReservedReaders) {
String filename = getReservedReadeFilename(reader);
InterruptibleIterator iter;
+
+ FileSKVIterator source = reader;
+ if (samplerConfig != null) {
+ source = source.getSample(samplerConfig);
+ if (source == null) {
+ throw new SampleNotPresentException();
+ }
+ }
+
if (detachable) {
- FileDataSource fds = new FileDataSource(filename, reader);
+ FileDataSource fds = new FileDataSource(filename, source);
dataSources.add(fds);
SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds);
iter = new ProblemReportingIterator(context, tablet.getTableId().toString(), filename, continueOnFailure, ssi);
} else {
- iter = new ProblemReportingIterator(context, tablet.getTableId().toString(), filename, continueOnFailure, reader);
+ iter = new ProblemReportingIterator(context, tablet.getTableId().toString(), filename, continueOnFailure, source);
}
DataFileValue value = files.get(new FileRef(filename));
if (value.isTimeSet()) {
@@ -539,7 +550,7 @@ public class FileManager {
fds.unsetIterator();
}
- public synchronized void reattach() throws IOException {
+ public synchronized void reattach(SamplerConfigurationImpl samplerConfig) throws IOException {
if (tabletReservedReaders.size() != 0)
throw new IllegalStateException();
@@ -562,7 +573,14 @@ public class FileManager {
for (FileDataSource fds : dataSources) {
FileSKVIterator reader = map.get(fds.file).remove(0);
- fds.setIterator(reader);
+ FileSKVIterator source = reader;
+ if (samplerConfig != null) {
+ source = source.getSample(samplerConfig);
+ if (source == null) {
+ throw new SampleNotPresentException();
+ }
+ }
+ fds.setIterator(source);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index 2274eea..f5141ff 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -16,6 +16,8 @@
*/
package org.apache.accumulo.tserver;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -33,8 +35,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.ByteSequence;
@@ -51,15 +56,20 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.SortedMapIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.system.EmptyIterator;
import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.sample.Sampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.sample.impl.SamplerFactory;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
+import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.PreAllocatedArray;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
@@ -68,7 +78,8 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
public class InMemoryMap {
private SimpleMap map = null;
@@ -80,22 +91,58 @@ public class InMemoryMap {
private Map<String,Set<ByteSequence>> lggroups;
- public InMemoryMap(boolean useNativeMap, String memDumpDir) {
- this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
+ private static Pair<SamplerConfigurationImpl,Sampler> getSampler(AccumuloConfiguration config) {
+ try {
+ SamplerConfigurationImpl sampleConfig = SamplerConfigurationImpl.newSamplerConfig(config);
+ if (sampleConfig == null) {
+ return new Pair<>(null, null);
+ }
+
+ return new Pair<>(sampleConfig, SamplerFactory.newSampler(sampleConfig, config));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
- public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
- this.memDumpDir = memDumpDir;
- this.lggroups = lggroups;
+ private AtomicReference<Pair<SamplerConfigurationImpl,Sampler>> samplerRef = new AtomicReference<>(null);
- if (lggroups.size() == 0)
- map = newMap(useNativeMap);
- else
- map = new LocalityGroupMap(lggroups, useNativeMap);
+ private AccumuloConfiguration config;
+
+ // defer creating sampler until first write. This was done because an empty sample map configured with no sampler will not flush after a user changes sample
+ // config.
+ private Sampler getOrCreateSampler() {
+ Pair<SamplerConfigurationImpl,Sampler> pair = samplerRef.get();
+ if (pair == null) {
+ pair = getSampler(config);
+ if (!samplerRef.compareAndSet(null, pair)) {
+ pair = samplerRef.get();
+ }
+ }
+
+ return pair.getSecond();
}
public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
- this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
+
+ boolean useNativeMap = config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED);
+
+ this.memDumpDir = config.get(Property.TSERV_MEMDUMP_DIR);
+ this.lggroups = LocalityGroupUtil.getLocalityGroups(config);
+
+ this.config = config;
+
+ SimpleMap allMap;
+ SimpleMap sampleMap;
+
+ if (lggroups.size() == 0) {
+ allMap = newMap(useNativeMap);
+ sampleMap = newMap(useNativeMap);
+ } else {
+ allMap = new LocalityGroupMap(lggroups, useNativeMap);
+ sampleMap = new LocalityGroupMap(lggroups, useNativeMap);
+ }
+
+ map = new SampleMap(allMap, sampleMap);
}
private static SimpleMap newMap(boolean useNativeMap) {
@@ -117,7 +164,7 @@ public class InMemoryMap {
int size();
- InterruptibleIterator skvIterator();
+ InterruptibleIterator skvIterator(SamplerConfigurationImpl samplerConfig);
void delete();
@@ -126,6 +173,95 @@ public class InMemoryMap {
void mutate(List<Mutation> mutations, int kvCount);
}
+ private class SampleMap implements SimpleMap {
+
+ private SimpleMap map;
+ private SimpleMap sample;
+
+ public SampleMap(SimpleMap map, SimpleMap sampleMap) {
+ this.map = map;
+ this.sample = sampleMap;
+ }
+
+ @Override
+ public Value get(Key key) {
+ return map.get(key);
+ }
+
+ @Override
+ public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public InterruptibleIterator skvIterator(SamplerConfigurationImpl samplerConfig) {
+ if (samplerConfig == null)
+ return map.skvIterator(null);
+ else {
+ Pair<SamplerConfigurationImpl,Sampler> samplerAndConf = samplerRef.get();
+ if (samplerAndConf == null) {
+ return EmptyIterator.EMPTY_ITERATOR;
+ } else if (samplerAndConf.getFirst() != null && samplerAndConf.getFirst().equals(samplerConfig)) {
+ return sample.skvIterator(null);
+ } else {
+ throw new SampleNotPresentException();
+ }
+ }
+ }
+
+ @Override
+ public void delete() {
+ map.delete();
+ sample.delete();
+ }
+
+ @Override
+ public long getMemoryUsed() {
+ return map.getMemoryUsed() + sample.getMemoryUsed();
+ }
+
+ @Override
+ public void mutate(List<Mutation> mutations, int kvCount) {
+ map.mutate(mutations, kvCount);
+
+ Sampler sampler = getOrCreateSampler();
+ if (sampler != null) {
+ List<Mutation> sampleMutations = null;
+
+ for (Mutation m : mutations) {
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ List<ColumnUpdate> sampleColUpdates = null;
+ for (ColumnUpdate cvp : colUpdates) {
+ Key k = new Key(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(), false);
+ if (sampler.accept(k)) {
+ if (sampleColUpdates == null) {
+ sampleColUpdates = new ArrayList<>();
+ }
+ sampleColUpdates.add(cvp);
+ }
+ }
+
+ if (sampleColUpdates != null) {
+ if (sampleMutations == null) {
+ sampleMutations = new ArrayList<>();
+ }
+
+ sampleMutations.add(new LocalityGroupUtil.PartitionedMutation(m.getRow(), sampleColUpdates));
+ }
+ }
+
+ if (sampleMutations != null) {
+ sample.mutate(sampleMutations, kvCount);
+ }
+ }
+ }
+ }
+
private static class LocalityGroupMap implements SimpleMap {
private PreAllocatedArray<Map<ByteSequence,MutableLong>> groupFams;
@@ -181,13 +317,16 @@ public class InMemoryMap {
}
@Override
- public InterruptibleIterator skvIterator() {
+ public InterruptibleIterator skvIterator(SamplerConfigurationImpl samplerConfig) {
+ if (samplerConfig != null)
+ throw new SampleNotPresentException();
+
LocalityGroup groups[] = new LocalityGroup[maps.length];
for (int i = 0; i < groups.length; i++) {
if (i < groupFams.length)
- groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams.get(i), false);
+ groups[i] = new LocalityGroup(maps[i].skvIterator(null), groupFams.get(i), false);
else
- groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
+ groups[i] = new LocalityGroup(maps[i].skvIterator(null), null, true);
}
return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
@@ -264,7 +403,9 @@ public class InMemoryMap {
}
@Override
- public synchronized InterruptibleIterator skvIterator() {
+ public InterruptibleIterator skvIterator(SamplerConfigurationImpl samplerConfig) {
+ if (samplerConfig != null)
+ throw new SampleNotPresentException();
if (map == null)
throw new IllegalStateException();
@@ -327,7 +468,9 @@ public class InMemoryMap {
}
@Override
- public InterruptibleIterator skvIterator() {
+ public InterruptibleIterator skvIterator(SamplerConfigurationImpl samplerConfig) {
+ if (samplerConfig != null)
+ throw new SampleNotPresentException();
return (InterruptibleIterator) nativeMap.skvIterator();
}
@@ -410,16 +553,30 @@ public class InMemoryMap {
private MemoryDataSource parent;
private IteratorEnvironment env;
private AtomicBoolean iflag;
+ private SamplerConfigurationImpl iteratorSamplerConfig;
+
+ private SamplerConfigurationImpl getSamplerConfig() {
+ if (env != null) {
+ if (env.isSamplingEnabled()) {
+ return new SamplerConfigurationImpl(env.getSamplerConfiguration());
+ } else {
+ return null;
+ }
+ } else {
+ return iteratorSamplerConfig;
+ }
+ }
- MemoryDataSource() {
- this(null, false, null, null);
+ MemoryDataSource(SamplerConfigurationImpl samplerConfig) {
+ this(null, false, null, null, samplerConfig);
}
- public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env, AtomicBoolean iflag) {
+ public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env, AtomicBoolean iflag, SamplerConfigurationImpl samplerConfig) {
this.parent = parent;
this.switched = switched;
this.env = env;
this.iflag = iflag;
+ this.iteratorSamplerConfig = samplerConfig;
}
@Override
@@ -457,6 +614,10 @@ public class InMemoryMap {
reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, SiteConfiguration.getInstance());
if (iflag != null)
reader.setInterruptFlag(iflag);
+
+ if (getSamplerConfig() != null) {
+ reader = reader.getSample(getSamplerConfig());
+ }
}
return reader;
@@ -466,7 +627,7 @@ public class InMemoryMap {
public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
if (iter == null)
if (!switched) {
- iter = map.skvIterator();
+ iter = map.skvIterator(getSamplerConfig());
if (iflag != null)
iter.setInterruptFlag(iflag);
} else {
@@ -485,7 +646,7 @@ public class InMemoryMap {
@Override
public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
- return new MemoryDataSource(parent == null ? this : parent, switched, env, iflag);
+ return new MemoryDataSource(parent == null ? this : parent, switched, env, iflag, iteratorSamplerConfig);
}
@Override
@@ -562,7 +723,7 @@ public class InMemoryMap {
}
- public synchronized MemoryIterator skvIterator() {
+ public synchronized MemoryIterator skvIterator(SamplerConfigurationImpl iteratorSamplerConfig) {
if (map == null)
throw new NullPointerException();
@@ -570,8 +731,9 @@ public class InMemoryMap {
throw new IllegalStateException("Can not obtain iterator after map deleted");
int mc = kvCount.get();
- MemoryDataSource mds = new MemoryDataSource();
- SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
+ MemoryDataSource mds = new MemoryDataSource(iteratorSamplerConfig);
+ // TODO seems like a bug that two MemoryDataSources are created... may need to fix in older branches
+ SourceSwitchingIterator ssi = new SourceSwitchingIterator(mds);
MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc));
mi.setSSI(ssi);
mi.setMDS(mds);
@@ -584,7 +746,7 @@ public class InMemoryMap {
if (nextKVCount.get() - 1 != kvCount.get())
throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = " + kvCount.get());
- return map.skvIterator();
+ return map.skvIterator(null);
}
private boolean deleted = false;
@@ -615,9 +777,15 @@ public class InMemoryMap {
Configuration newConf = new Configuration(conf);
newConf.setInt("io.seqfile.compress.blocksize", 100000);
- FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, SiteConfiguration.getInstance());
+ AccumuloConfiguration siteConf = SiteConfiguration.getInstance();
- InterruptibleIterator iter = map.skvIterator();
+ if (getOrCreateSampler() != null) {
+ siteConf = createSampleConfig(siteConf);
+ }
+
+ FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, siteConf);
+
+ InterruptibleIterator iter = map.skvIterator(null);
HashSet<ByteSequence> allfams = new HashSet<ByteSequence>();
@@ -668,14 +836,28 @@ public class InMemoryMap {
tmpMap.delete();
}
+ private AccumuloConfiguration createSampleConfig(AccumuloConfiguration siteConf) {
+ ConfigurationCopy confCopy = new ConfigurationCopy(Iterables.filter(siteConf, new Predicate<Entry<String,String>>() {
+ @Override
+ public boolean apply(Entry<String,String> input) {
+ return !input.getKey().startsWith(Property.TABLE_SAMPLER.getKey());
+ }
+ }));
+
+ for (Entry<String,String> entry : samplerRef.get().getFirst().toTablePropertiesMap().entrySet()) {
+ confCopy.set(entry.getKey(), entry.getValue());
+ }
+
+ siteConf = confCopy;
+ return siteConf;
+ }
+
private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException {
while (iter.hasTop() && activeIters.size() > 0) {
// RFile does not support MemKey, so we move the kv count into the value only for the RFile.
// There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
- Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
- out.append(iter.getTopKey(), newValue);
+ out.append(iter.getTopKey(), MemValue.encode(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount));
iter.next();
-
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java
index 00c8be9..71a4cbd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java
@@ -61,10 +61,10 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible
currVal = v;
return;
}
- currVal = new Value(v);
- int mc = MemValue.splitKVCount(currVal);
- currKey = new MemKey(k, mc);
+ MemValue mv = MemValue.decode(v);
+ currVal = mv.value;
+ currKey = new MemKey(k, mv.kvCount);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
index bc44459..af6f2f1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
@@ -16,69 +16,38 @@
*/
package org.apache.accumulo.tserver;
-import java.io.DataOutput;
-import java.io.IOException;
-
import org.apache.accumulo.core.data.Value;
/**
*
*/
-public class MemValue extends Value {
- int kvCount;
- boolean merged = false;
+public class MemValue {
- public MemValue() {
- super();
- this.kvCount = Integer.MAX_VALUE;
- }
+ Value value;
+ int kvCount;
public MemValue(Value value, int kv) {
- super(value);
+ this.value = value;
this.kvCount = kv;
}
- // Override
- @Override
- public void write(final DataOutput out) throws IOException {
- if (!merged) {
- byte[] combinedBytes = new byte[getSize() + 4];
- System.arraycopy(value, 0, combinedBytes, 4, getSize());
- combinedBytes[0] = (byte) (kvCount >>> 24);
- combinedBytes[1] = (byte) (kvCount >>> 16);
- combinedBytes[2] = (byte) (kvCount >>> 8);
- combinedBytes[3] = (byte) (kvCount);
- value = combinedBytes;
- merged = true;
- }
- super.write(out);
- }
-
- @Override
- public void set(final byte[] b) {
- super.set(b);
- merged = false;
- }
-
- @Override
- public void copy(byte[] b) {
- super.copy(b);
- merged = false;
+ public static Value encode(Value value, int kv) {
+ byte[] combinedBytes = new byte[value.getSize() + 4];
+ System.arraycopy(value.get(), 0, combinedBytes, 4, value.getSize());
+ combinedBytes[0] = (byte) (kv >>> 24);
+ combinedBytes[1] = (byte) (kv >>> 16);
+ combinedBytes[2] = (byte) (kv >>> 8);
+ combinedBytes[3] = (byte) (kv);
+ return new Value(combinedBytes);
}
- /**
- * Takes a Value and will take out the embedded kvCount, and then return that value while replacing the Value with the original unembedded version
- *
- * @return The kvCount embedded in v.
- */
- public static int splitKVCount(Value v) {
- if (v instanceof MemValue)
- return ((MemValue) v).kvCount;
-
+ public static MemValue decode(Value v) {
byte[] originalBytes = new byte[v.getSize() - 4];
byte[] combined = v.get();
System.arraycopy(combined, 4, originalBytes, 0, originalBytes.length);
v.set(originalBytes);
- return (combined[0] << 24) + ((combined[1] & 0xFF) << 16) + ((combined[2] & 0xFF) << 8) + (combined[3] & 0xFF);
+ int kv = (combined[0] << 24) + ((combined[1] & 0xFF) << 16) + ((combined[2] & 0xFF) << 8) + (combined[3] & 0xFF);
+
+ return new MemValue(new Value(originalBytes), kv);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
index cf01dd3..3cb4d40 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
@@ -34,6 +34,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Key;
@@ -749,6 +750,9 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ if (env != null && env.isSamplingEnabled()) {
+ throw new SampleNotPresentException();
+ }
return new NMSKVIter(map, interruptFlag);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
index 6c5b63d..73adec3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -29,6 +31,7 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.MultiIterator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.tserver.FileManager.ScanFileManager;
@@ -40,10 +43,12 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
private final IteratorScope scope;
private final boolean fullMajorCompaction;
private final AccumuloConfiguration config;
- private final ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>();
+ private final ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators;
private Map<FileRef,DataFileValue> files;
private final Authorizations authorizations; // these will only be supplied during scan scope
+ private SamplerConfiguration samplerConfig;
+ private boolean enableSampleForDeepCopy;
public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config) {
if (scope == IteratorScope.majc)
@@ -54,10 +59,11 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
this.config = config;
this.fullMajorCompaction = false;
this.authorizations = Authorizations.EMPTY;
+ this.topLevelIterators = new ArrayList<>();
}
- public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map<FileRef,DataFileValue> files,
- Authorizations authorizations) {
+ private TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map<FileRef,DataFileValue> files,
+ Authorizations authorizations, SamplerConfigurationImpl samplerConfig, ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators) {
if (scope == IteratorScope.majc)
throw new IllegalArgumentException("must set if compaction is full");
@@ -67,6 +73,19 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
this.fullMajorCompaction = false;
this.files = files;
this.authorizations = authorizations;
+ if (samplerConfig != null) {
+ enableSampleForDeepCopy = true;
+ this.samplerConfig = samplerConfig.toSamplerConfiguration();
+ } else {
+ enableSampleForDeepCopy = false;
+ }
+
+ this.topLevelIterators = topLevelIterators;
+ }
+
+ public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map<FileRef,DataFileValue> files,
+ Authorizations authorizations, SamplerConfigurationImpl samplerConfig) {
+ this(scope, config, trm, files, authorizations, samplerConfig, new ArrayList<SortedKeyValueIterator<Key,Value>>());
}
public TabletIteratorEnvironment(IteratorScope scope, boolean fullMajC, AccumuloConfiguration config) {
@@ -78,6 +97,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
this.config = config;
this.fullMajorCompaction = fullMajC;
this.authorizations = Authorizations.EMPTY;
+ this.topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>();
}
@Override
@@ -100,7 +120,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
@Override
public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
FileRef ref = new FileRef(mapFileName, new Path(mapFileName));
- return trm.openFiles(Collections.singletonMap(ref, files.get(ref)), false).get(0);
+ return trm.openFiles(Collections.singletonMap(ref, files.get(ref)), false, null).get(0);
}
@Override
@@ -122,4 +142,37 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
allIters.add(iter);
return new MultiIterator(allIters, false);
}
+
+ @Override
+ public boolean isSamplingEnabled() {
+ return enableSampleForDeepCopy;
+ }
+
+ @Override
+ public SamplerConfiguration getSamplerConfiguration() {
+ if (samplerConfig == null) {
+ // only create this once so that it stays the same, even if config changes
+ SamplerConfigurationImpl sci = SamplerConfigurationImpl.newSamplerConfig(config);
+ if (sci == null) {
+ return null;
+ }
+ samplerConfig = sci.toSamplerConfiguration();
+ }
+ return samplerConfig;
+ }
+
+ @Override
+ public IteratorEnvironment cloneWithSamplingEnabled() {
+ if (!scope.equals(IteratorScope.scan)) {
+ throw new UnsupportedOperationException();
+ }
+
+ SamplerConfigurationImpl sci = SamplerConfigurationImpl.newSamplerConfig(config);
+ if (sci == null) {
+ throw new SampleNotPresentException();
+ }
+
+ TabletIteratorEnvironment te = new TabletIteratorEnvironment(scope, config, trm, files, authorizations, sci, topLevelIterators);
+ return te;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index de89b50..d35e6af 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -61,6 +61,7 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.client.impl.CompressedIterators;
import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
import org.apache.accumulo.core.client.impl.DurabilityImpl;
@@ -114,6 +115,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -123,6 +125,8 @@ import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -447,8 +451,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
@Override
public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated,
- long readaheadThreshold, long batchTimeOut) throws NotServingTabletException, ThriftSecurityException,
- org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+ long readaheadThreshold, TSamplerConfiguration tSamplerConfig, long batchTimeOut) throws NotServingTabletException, ThriftSecurityException,
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException, TSampleNotPresentException {
String tableId = new String(textent.getTable(), UTF_8);
if (!security.canScan(credentials, tableId, Tables.getNamespaceId(getInstance(), tableId), range, columns, ssiList, ssio, authorizations))
@@ -480,10 +484,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
for (TColumn tcolumn : columns) {
columnSet.add(new Column(tcolumn));
}
+
final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio, new Authorizations(authorizations), readaheadThreshold,
batchTimeOut);
scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
- scanSession.interruptFlag, scanSession.batchTimeOut);
+ scanSession.interruptFlag, SamplerConfigurationImpl.fromThrift(tSamplerConfig), scanSession.batchTimeOut);
long sid = sessionManager.createSession(scanSession, true);
@@ -502,7 +507,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
@Override
public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
- org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException, TSampleNotPresentException {
ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
if (scanSession == null) {
throw new NoSuchScanIDException();
@@ -516,7 +521,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
- org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException, TSampleNotPresentException {
if (scanSession.nextBatchTask == null) {
scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag);
@@ -533,6 +538,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
throw (NotServingTabletException) e.getCause();
else if (e.getCause() instanceof TooManyFilesException)
throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift());
+ else if (e.getCause() instanceof SampleNotPresentException)
+ throw new TSampleNotPresentException(scanSession.extent.toThrift());
else if (e.getCause() instanceof IOException) {
sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
List<KVEntry> empty = Collections.emptyList();
@@ -595,8 +602,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
@Override
public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
- List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut)
- throws ThriftSecurityException {
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
+ TSamplerConfiguration tSamplerConfig, long batchTimeOut) throws ThriftSecurityException, TSampleNotPresentException {
// find all of the tables that need to be scanned
final HashSet<String> tables = new HashSet<String>();
for (TKeyExtent keyExtent : tbatch.keySet()) {
@@ -627,7 +634,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
if (waitForWrites)
writeTracker.waitForWrites(TabletType.type(batch.keySet()));
- final MultiScanSession mss = new MultiScanSession(credentials, threadPoolExtent, batch, ssiList, ssio, new Authorizations(authorizations), batchTimeOut);
+ final MultiScanSession mss = new MultiScanSession(credentials, threadPoolExtent, batch, ssiList, ssio, new Authorizations(authorizations),
+ SamplerConfigurationImpl.fromThrift(tSamplerConfig), batchTimeOut);
mss.numTablets = batch.size();
for (List<Range> ranges : batch.values()) {
@@ -653,7 +661,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
@Override
- public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+ public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, TSampleNotPresentException {
MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
@@ -668,7 +676,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
}
- private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
+ private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException, TSampleNotPresentException {
if (session.lookupTask == null) {
session.lookupTask = new LookupTask(TabletServer.this, scanID);
@@ -679,6 +687,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
session.lookupTask = null;
return scanResult;
+ } catch (ExecutionException e) {
+ sessionManager.removeSession(scanID);
+ if (e.getCause() instanceof SampleNotPresentException) {
+ throw new TSampleNotPresentException();
+ } else {
+ log.warn("Failed to get multiscan result", e);
+ throw new RuntimeException(e);
+ }
} catch (TimeoutException e1) {
long timeout = TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
sessionManager.removeIfNotAccessed(scanID, timeout);
@@ -1116,7 +1132,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
IterConfig ic = compressedIters.decompress(tc.iterators);
- Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag, 0);
+ Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag, null, 0);
try {
ScanBatch batch = scanner.read();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
index b97b88b..04915ef 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java
@@ -26,7 +26,10 @@ import java.util.Set;
import java.util.regex.Pattern;
import org.apache.accumulo.core.compaction.CompactionSettings;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.tserver.compaction.CompactionPlan;
import org.apache.accumulo.tserver.compaction.CompactionStrategy;
@@ -40,6 +43,22 @@ public class ConfigurableCompactionStrategy extends CompactionStrategy {
boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request);
}
+ private static class NoSampleTest implements Test {
+
+ @Override
+ public boolean shouldCompact(Entry<FileRef,DataFileValue> file, MajorCompactionRequest request) {
+ try (FileSKVIterator reader = request.openReader(file.getKey())) {
+ SamplerConfigurationImpl sc = SamplerConfigurationImpl.newSamplerConfig(new ConfigurationCopy(request.getTableProperties()));
+ if (sc == null) {
+ return false;
+ }
+ return reader.getSample(sc) == null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
private static abstract class FileSizeTest implements Test {
private final long esize;
@@ -83,6 +102,9 @@ public class ConfigurableCompactionStrategy extends CompactionStrategy {
for (Entry<String,String> entry : es) {
switch (CompactionSettings.valueOf(entry.getKey())) {
+ case SF_NO_SAMPLE:
+ tests.add(new NoSampleTest());
+ break;
case SF_LT_ESIZE_OPT:
tests.add(new FileSizeTest(entry.getValue()) {
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
index 57a09ce..2d745cb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.client.impl.Translator;
import org.apache.accumulo.core.client.impl.Translators;
import org.apache.accumulo.core.conf.Property;
@@ -111,7 +112,7 @@ public class LookupTask extends ScanTask<MultiScanResult> {
interruptFlag.set(true);
lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList, session.ssio,
- interruptFlag, session.batchTimeOut);
+ interruptFlag, session.samplerConfig, session.batchTimeOut);
// if the tablet was closed it it possible that the
// interrupt flag was set.... do not want it set for
@@ -163,6 +164,8 @@ public class LookupTask extends ScanTask<MultiScanResult> {
log.warn("Iteration interrupted, when scan not cancelled", iie);
addResult(iie);
}
+ } catch (SampleNotPresentException e) {
+ addResult(e);
} catch (Throwable e) {
log.warn("exception while doing multi-scan ", e);
addResult(e);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
index e3f4146..ec28367 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.tserver.scan;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.tserver.TabletServer;
@@ -84,8 +85,8 @@ public class NextBatchTask extends ScanTask<ScanBatch> {
log.warn("Iteration interrupted, when scan not cancelled", iie);
addResult(iie);
}
- } catch (TooManyFilesException tmfe) {
- addResult(tmfe);
+ } catch (TooManyFilesException | SampleNotPresentException e) {
+ addResult(e);
} catch (OutOfMemoryError ome) {
Halt.halt("Ran out of memory scanning " + scanSession.extent + " for " + scanSession.client);
addResult(ome);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index fccac47..16fc218 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -20,6 +20,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.impl.KeyExtent;
@@ -36,6 +37,7 @@ public class MultiScanSession extends Session {
public final List<IterInfo> ssiList;
public final Map<String,Map<String,String>> ssio;
public final Authorizations auths;
+ public final SamplerConfiguration samplerConfig;
public final long batchTimeOut;
// stats
@@ -47,13 +49,14 @@ public class MultiScanSession extends Session {
public volatile ScanTask<MultiScanResult> lookupTask;
public MultiScanSession(TCredentials credentials, KeyExtent threadPoolExtent, Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList,
- Map<String,Map<String,String>> ssio, Authorizations authorizations, long batchTimeOut) {
+ Map<String,Map<String,String>> ssio, Authorizations authorizations, SamplerConfiguration samplerConfig, long batchTimeOut) {
super(credentials);
this.queries = queries;
this.ssiList = ssiList;
this.ssio = ssio;
this.auths = authorizations;
this.threadPoolExtent = threadPoolExtent;
+ this.samplerConfig = samplerConfig;
this.batchTimeOut = batchTimeOut;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 853714a..72c289c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -42,6 +43,7 @@ import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSou
import org.apache.accumulo.core.iterators.system.StatsIterator;
import org.apache.accumulo.core.iterators.system.VisibilityFilter;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.fs.FileRef;
@@ -50,6 +52,8 @@ import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
import org.apache.accumulo.tserver.TabletIteratorEnvironment;
import org.apache.accumulo.tserver.TabletServer;
+import com.google.common.collect.Iterables;
+
class ScanDataSource implements DataSource {
// data source state
@@ -65,10 +69,10 @@ class ScanDataSource implements DataSource {
private final ScanOptions options;
ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
- Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, long batchTimeOut) {
+ Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, SamplerConfiguration samplerConfig, long batchTimeOut) {
this.tablet = tablet;
expectedDeletionCount = tablet.getDataSourceDeletions();
- this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false, batchTimeOut);
+ this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false, samplerConfig, batchTimeOut);
this.interruptFlag = interruptFlag;
}
@@ -117,6 +121,8 @@ class ScanDataSource implements DataSource {
Map<FileRef,DataFileValue> files;
+ SamplerConfigurationImpl samplerConfig = options.getSamplerConfigurationImpl();
+
synchronized (tablet) {
if (memIters != null)
@@ -141,26 +147,26 @@ class ScanDataSource implements DataSource {
// getIterators() throws an exception
expectedDeletionCount = tablet.getDataSourceDeletions();
- memIters = tablet.getTabletMemory().getIterators();
+ memIters = tablet.getTabletMemory().getIterators(samplerConfig);
Pair<Long,Map<FileRef,DataFileValue>> reservation = tablet.getDatafileManager().reserveFilesForScan();
fileReservationId = reservation.getFirst();
files = reservation.getSecond();
}
- Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isIsolated());
+ Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isIsolated(), samplerConfig);
+
+ for (SortedKeyValueIterator<Key,Value> skvi : Iterables.concat(mapfiles, memIters))
+ ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() + memIters.size());
iters.addAll(mapfiles);
iters.addAll(memIters);
- for (SortedKeyValueIterator<Key,Value> skvi : iters)
- ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
-
MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent());
TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, tablet.getTableConfiguration(), fileManager, files,
- options.getAuthorizations());
+ options.getAuthorizations(), samplerConfig);
statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter());
@@ -212,7 +218,7 @@ class ScanDataSource implements DataSource {
public void reattachFileManager() throws IOException {
if (fileManager != null)
- fileManager.reattach();
+ fileManager.reattach(options.getSamplerConfigurationImpl());
}
public void detachFileManager() {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
index 2a38fbd..c97f3ac 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
@@ -21,8 +21,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
final class ScanOptions {
@@ -35,10 +37,11 @@ final class ScanOptions {
private final AtomicBoolean interruptFlag;
private final int num;
private final boolean isolated;
+ private SamplerConfiguration samplerConfig;
private final long batchTimeOut;
ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
- AtomicBoolean interruptFlag, boolean isolated, long batchTimeOut) {
+ AtomicBoolean interruptFlag, boolean isolated, SamplerConfiguration samplerConfig, long batchTimeOut) {
this.num = num;
this.authorizations = authorizations;
this.defaultLabels = defaultLabels;
@@ -47,6 +50,7 @@ final class ScanOptions {
this.ssio = ssio;
this.interruptFlag = interruptFlag;
this.isolated = isolated;
+ this.samplerConfig = samplerConfig;
this.batchTimeOut = batchTimeOut;
}
@@ -82,6 +86,16 @@ final class ScanOptions {
return isolated;
}
+ public SamplerConfiguration getSamplerConfiguration() {
+ return samplerConfig;
+ }
+
+ public SamplerConfigurationImpl getSamplerConfigurationImpl() {
+ if (samplerConfig == null)
+ return null;
+ return new SamplerConfigurationImpl(samplerConfig);
+ }
+
public long getBatchTimeOut() {
return batchTimeOut;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index b8c260d..1f66302 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -51,6 +51,7 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
import org.apache.accumulo.core.client.impl.DurabilityImpl;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -632,7 +633,8 @@ public class Tablet implements TabletCommitter {
}
public LookupResult lookup(List<Range> ranges, HashSet<Column> columns, Authorizations authorizations, List<KVEntry> results, long maxResultSize,
- List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, long batchTimeOut) throws IOException {
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, SamplerConfiguration samplerConfig, long batchTimeOut)
+ throws IOException {
if (ranges.size() == 0) {
return new LookupResult();
@@ -650,7 +652,8 @@ public class Tablet implements TabletCommitter {
tabletRange.clip(range);
}
- ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, batchTimeOut);
+ ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, samplerConfig,
+ batchTimeOut);
LookupResult result = null;
@@ -754,12 +757,13 @@ public class Tablet implements TabletCommitter {
}
public Scanner createScanner(Range range, int num, Set<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
- Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag, long batchTimeOut) {
+ Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag, SamplerConfiguration samplerConfig, long batchTimeOut) {
// do a test to see if this range falls within the tablet, if it does not
// then clip will throw an exception
extent.toDataRange().clip(range);
- ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated, batchTimeOut);
+ ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated, samplerConfig,
+ batchTimeOut);
return new Scanner(this, range, opts);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
index 0b39d40..86cc262 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.accumulo.tserver.InMemoryMap;
import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
@@ -156,11 +157,11 @@ class TabletMemory implements Closeable {
tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other);
}
- public List<MemoryIterator> getIterators() {
+ public List<MemoryIterator> getIterators(SamplerConfigurationImpl samplerConfig) {
List<MemoryIterator> toReturn = new ArrayList<MemoryIterator>(2);
- toReturn.add(memTable.skvIterator());
+ toReturn.add(memTable.skvIterator(samplerConfig));
if (otherMemTable != null)
- toReturn.add(otherMemTable.skvIterator());
+ toReturn.add(otherMemTable.skvIterator(samplerConfig));
return toReturn;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
index da7157a..7b4d447 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
@@ -26,16 +26,22 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
@@ -45,21 +51,56 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.apache.accumulo.core.sample.Sampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.sample.impl.SamplerFactory;
import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ZooConfiguration;
import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
+import com.google.common.collect.ImmutableMap;
+
public class InMemoryMapTest {
+ private static class SampleIE extends BaseIteratorEnvironment {
+
+ private final SamplerConfiguration sampleConfig;
+
+ public SampleIE() {
+ this.sampleConfig = null;
+ }
+
+ public SampleIE(SamplerConfigurationImpl sampleConfig) {
+ this.sampleConfig = sampleConfig.toSamplerConfiguration();
+ }
+
+ @Override
+ public boolean isSamplingEnabled() {
+ return sampleConfig != null;
+ }
+
+ @Override
+ public SamplerConfiguration getSamplerConfiguration() {
+ return sampleConfig;
+ }
+ }
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
@BeforeClass
public static void setUp() throws Exception {
// suppress log messages having to do with not having an instance
@@ -101,20 +142,42 @@ public class InMemoryMapTest {
}
static Set<ByteSequence> newCFSet(String... cfs) {
- HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>();
+ HashSet<ByteSequence> cfSet = new HashSet<>();
for (String cf : cfs) {
cfSet.add(new ArrayByteSequence(cf));
}
return cfSet;
}
+ static Set<Text> toTextSet(String... cfs) {
+ HashSet<Text> cfSet = new HashSet<>();
+ for (String cf : cfs) {
+ cfSet.add(new Text(cf));
+ }
+ return cfSet;
+ }
+
+ static ConfigurationCopy newConfig(String memDumpDir) {
+ ConfigurationCopy config = new ConfigurationCopy(DefaultConfiguration.getInstance());
+ config.set(Property.TSERV_NATIVEMAP_ENABLED, "" + false);
+ config.set(Property.TSERV_MEMDUMP_DIR, memDumpDir);
+ return config;
+ }
+
+ static InMemoryMap newInMemoryMap(boolean useNative, String memDumpDir) throws LocalityGroupConfigurationError {
+ ConfigurationCopy config = new ConfigurationCopy(DefaultConfiguration.getInstance());
+ config.set(Property.TSERV_NATIVEMAP_ENABLED, "" + useNative);
+ config.set(Property.TSERV_MEMDUMP_DIR, memDumpDir);
+ return new InMemoryMap(config);
+ }
+
@Test
public void test2() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
- MemoryIterator ski1 = imm.skvIterator();
+ MemoryIterator ski1 = imm.skvIterator(null);
mutate(imm, "r1", "foo:cq1", 3, "bar1");
- MemoryIterator ski2 = imm.skvIterator();
+ MemoryIterator ski2 = imm.skvIterator(null);
ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
assertFalse(ski1.hasTop());
@@ -128,17 +191,17 @@ public class InMemoryMapTest {
@Test
public void test3() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq1", 3, "bar2");
- MemoryIterator ski1 = imm.skvIterator();
+ MemoryIterator ski1 = imm.skvIterator(null);
mutate(imm, "r1", "foo:cq1", 3, "bar3");
mutate(imm, "r3", "foo:cq1", 3, "bar9");
mutate(imm, "r3", "foo:cq1", 3, "bara");
- MemoryIterator ski2 = imm.skvIterator();
+ MemoryIterator ski2 = imm.skvIterator(null);
ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
ae(ski1, "r1", "foo:cq1", 3, "bar2");
@@ -154,11 +217,11 @@ public class InMemoryMapTest {
@Test
public void test4() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq1", 3, "bar2");
- MemoryIterator ski1 = imm.skvIterator();
+ MemoryIterator ski1 = imm.skvIterator(null);
mutate(imm, "r1", "foo:cq1", 3, "bar3");
imm.delete(0);
@@ -186,13 +249,13 @@ public class InMemoryMapTest {
@Test
public void test5() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq1", 3, "bar2");
mutate(imm, "r1", "foo:cq1", 3, "bar3");
- MemoryIterator ski1 = imm.skvIterator();
+ MemoryIterator ski1 = imm.skvIterator(null);
ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
ae(ski1, "r1", "foo:cq1", 3, "bar3");
@@ -204,13 +267,13 @@ public class InMemoryMapTest {
ski1.close();
- imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq2", 3, "bar2");
mutate(imm, "r1", "foo:cq3", 3, "bar3");
- ski1 = imm.skvIterator();
+ ski1 = imm.skvIterator(null);
ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
ae(ski1, "r1", "foo:cq1", 3, "bar1");
@@ -225,18 +288,18 @@ public class InMemoryMapTest {
@Test
public void test6() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq2", 3, "bar2");
mutate(imm, "r1", "foo:cq3", 3, "bar3");
mutate(imm, "r1", "foo:cq4", 3, "bar4");
- MemoryIterator ski1 = imm.skvIterator();
+ MemoryIterator ski1 = imm.skvIterator(null);
mutate(imm, "r1", "foo:cq5", 3, "bar5");
- SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null);
+ SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(new SampleIE());
ski1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
ae(ski1, "r1", "foo:cq1", 3, "bar1");
@@ -271,12 +334,12 @@ public class InMemoryMapTest {
private void deepCopyAndDelete(int interleaving, boolean interrupt) throws Exception {
// interleaving == 0 intentionally omitted, this runs the test w/o deleting in mem map
- InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq2", 3, "bar2");
- MemoryIterator ski1 = imm.skvIterator();
+ MemoryIterator ski1 = imm.skvIterator(null);
AtomicBoolean iflag = new AtomicBoolean(false);
ski1.setInterruptFlag(iflag);
@@ -287,7 +350,7 @@ public class InMemoryMapTest {
iflag.set(true);
}
- SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null);
+ SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(new SampleIE());
if (interleaving == 2) {
imm.delete(0);
@@ -338,7 +401,7 @@ public class InMemoryMapTest {
@Test
public void testBug1() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
for (int i = 0; i < 20; i++) {
mutate(imm, "r1", "foo:cq" + i, 3, "bar" + i);
@@ -348,7 +411,7 @@ public class InMemoryMapTest {
mutate(imm, "r2", "foo:cq" + i, 3, "bar" + i);
}
- MemoryIterator ski1 = imm.skvIterator();
+ MemoryIterator ski1 = imm.skvIterator(null);
ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(ski1);
imm.delete(0);
@@ -366,14 +429,14 @@ public class InMemoryMapTest {
@Test
public void testSeekBackWards() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq2", 3, "bar2");
mutate(imm, "r1", "foo:cq3", 3, "bar3");
mutate(imm, "r1", "foo:cq4", 3, "bar4");
- MemoryIterator skvi1 = imm.skvIterator();
+ MemoryIterator skvi1 = imm.skvIterator(null);
skvi1.seek(new Range(nk("r1", "foo:cq3", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
ae(skvi1, "r1", "foo:cq3", 3, "bar3");
@@ -385,14 +448,14 @@ public class InMemoryMapTest {
@Test
public void testDuplicateKey() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
Mutation m = new Mutation(new Text("r1"));
m.put(new Text("foo"), new Text("cq"), 3, new Value("v1".getBytes()));
m.put(new Text("foo"), new Text("cq"), 3, new Value("v2".getBytes()));
imm.mutate(Collections.singletonList(m));
- MemoryIterator skvi1 = imm.skvIterator();
+ MemoryIterator skvi1 = imm.skvIterator(null);
skvi1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
ae(skvi1, "r1", "foo:cq", 3, "v2");
ae(skvi1, "r1", "foo:cq", 3, "v1");
@@ -410,12 +473,12 @@ public class InMemoryMapTest {
// - hard to get this timing test to run well on apache build machines
@Test
@Ignore
- public void parallelWriteSpeed() throws InterruptedException, IOException {
+ public void parallelWriteSpeed() throws Exception {
List<Double> timings = new ArrayList<Double>();
for (int threads : new int[] {1, 2, 16, /* 64, 256 */}) {
final long now = System.currentTimeMillis();
final long counts[] = new long[threads];
- final InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ final InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
ExecutorService e = Executors.newFixedThreadPool(threads);
for (int j = 0; j < threads; j++) {
final int threadId = j;
@@ -451,12 +514,12 @@ public class InMemoryMapTest {
@Test
public void testLocalityGroups() throws Exception {
+ ConfigurationCopy config = newConfig(tempFolder.newFolder().getAbsolutePath());
+ config.set(Property.TABLE_LOCALITY_GROUP_PREFIX + "lg1", LocalityGroupUtil.encodeColumnFamilies(toTextSet("cf1", "cf2")));
+ config.set(Property.TABLE_LOCALITY_GROUP_PREFIX + "lg2", LocalityGroupUtil.encodeColumnFamilies(toTextSet("cf3", "cf4")));
+ config.set(Property.TABLE_LOCALITY_GROUPS.getKey(), "lg1,lg2");
- Map<String,Set<ByteSequence>> lggroups1 = new HashMap<String,Set<ByteSequence>>();
- lggroups1.put("lg1", newCFSet("cf1", "cf2"));
- lggroups1.put("lg2", newCFSet("cf3", "cf4"));
-
- InMemoryMap imm = new InMemoryMap(lggroups1, false, tempFolder.newFolder().getAbsolutePath());
+ InMemoryMap imm = new InMemoryMap(config);
Mutation m1 = new Mutation("r1");
m1.put("cf1", "x", 2, "1");
@@ -480,10 +543,10 @@ public class InMemoryMapTest {
imm.mutate(Arrays.asList(m1, m2, m3, m4, m5));
- MemoryIterator iter1 = imm.skvIterator();
+ MemoryIterator iter1 = imm.skvIterator(null);
seekLocalityGroups(iter1);
- SortedKeyValueIterator<Key,Value> dc1 = iter1.deepCopy(null);
+ SortedKeyValueIterator<Key,Value> dc1 = iter1.deepCopy(new SampleIE());
seekLocalityGroups(dc1);
assertTrue(imm.getNumEntries() == 10);
@@ -497,6 +560,254 @@ public class InMemoryMapTest {
// seekLocalityGroups(iter1.deepCopy(null));
}
+ @Test
+ public void testSample() throws Exception {
+
+ SamplerConfigurationImpl sampleConfig = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "7"));
+ Sampler sampler = SamplerFactory.newSampler(sampleConfig, DefaultConfiguration.getInstance());
+
+ ConfigurationCopy config1 = newConfig(tempFolder.newFolder().getAbsolutePath());
+ for (Entry<String,String> entry : sampleConfig.toTablePropertiesMap().entrySet()) {
+ config1.set(entry.getKey(), entry.getValue());
+ }
+
+ ConfigurationCopy config2 = newConfig(tempFolder.newFolder().getAbsolutePath());
+ config2.set(Property.TABLE_LOCALITY_GROUP_PREFIX + "lg1", LocalityGroupUtil.encodeColumnFamilies(toTextSet("cf2")));
+ config2.set(Property.TABLE_LOCALITY_GROUPS.getKey(), "lg1");
+ for (Entry<String,String> entry : sampleConfig.toTablePropertiesMap().entrySet()) {
+ config2.set(entry.getKey(), entry.getValue());
+ }
+
+ for (ConfigurationCopy config : Arrays.asList(config1, config2)) {
+
+ InMemoryMap imm = new InMemoryMap(config);
+
+ TreeMap<Key,Value> expectedSample = new TreeMap<>();
+ TreeMap<Key,Value> expectedAll = new TreeMap<>();
+ TreeMap<Key,Value> expectedNone = new TreeMap<>();
+
+ MemoryIterator iter0 = imm.skvIterator(sampleConfig);
+
+ for (int r = 0; r < 100; r++) {
+ String row = String.format("r%06d", r);
+ mutate(imm, row, "cf1:cq1", 5, "v" + (2 * r), sampler, expectedSample, expectedAll);
+ mutate(imm, row, "cf2:cq2", 5, "v" + ((2 * r) + 1), sampler, expectedSample, expectedAll);
+ }
+
+ assertTrue(expectedSample.size() > 0);
+
+ MemoryIterator iter1 = imm.skvIterator(sampleConfig);
+ MemoryIterator iter2 = imm.skvIterator(null);
+ SortedKeyValueIterator<Key,Value> iter0dc1 = iter0.deepCopy(new SampleIE());
+ SortedKeyValueIterator<Key,Value> iter0dc2 = iter0.deepCopy(new SampleIE(sampleConfig));
+ SortedKeyValueIterator<Key,Value> iter1dc1 = iter1.deepCopy(new SampleIE());
+ SortedKeyValueIterator<Key,Value> iter1dc2 = iter1.deepCopy(new SampleIE(sampleConfig));
+ SortedKeyValueIterator<Key,Value> iter2dc1 = iter2.deepCopy(new SampleIE());
+ SortedKeyValueIterator<Key,Value> iter2dc2 = iter2.deepCopy(new SampleIE(sampleConfig));
+
+ assertEquals(expectedNone, readAll(iter0));
+ assertEquals(expectedNone, readAll(iter0dc1));
+ assertEquals(expectedNone, readAll(iter0dc2));
+ assertEquals(expectedSample, readAll(iter1));
+ assertEquals(expectedAll, readAll(iter2));
+ assertEquals(expectedAll, readAll(iter1dc1));
+ assertEquals(expectedAll, readAll(iter2dc1));
+ assertEquals(expectedSample, readAll(iter1dc2));
+ assertEquals(expectedSample, readAll(iter2dc2));
+
+ imm.delete(0);
+
+ assertEquals(expectedNone, readAll(iter0));
+ assertEquals(expectedNone, readAll(iter0dc1));
+ assertEquals(expectedNone, readAll(iter0dc2));
+ assertEquals(expectedSample, readAll(iter1));
+ assertEquals(expectedAll, readAll(iter2));
+ assertEquals(expectedAll, readAll(iter1dc1));
+ assertEquals(expectedAll, readAll(iter2dc1));
+ assertEquals(expectedSample, readAll(iter1dc2));
+ assertEquals(expectedSample, readAll(iter2dc2));
+
+ SortedKeyValueIterator<Key,Value> iter0dc3 = iter0.deepCopy(new SampleIE());
+ SortedKeyValueIterator<Key,Value> iter0dc4 = iter0.deepCopy(new SampleIE(sampleConfig));
+ SortedKeyValueIterator<Key,Value> iter1dc3 = iter1.deepCopy(new SampleIE());
+ SortedKeyValueIterator<Key,Value> iter1dc4 = iter1.deepCopy(new SampleIE(sampleConfig));
+ SortedKeyValueIterator<Key,Value> iter2dc3 = iter2.deepCopy(new SampleIE());
+ SortedKeyValueIterator<Key,Value> iter2dc4 = iter2.deepCopy(new SampleIE(sampleConfig));
+
+ assertEquals(expectedNone, readAll(iter0dc3));
+ assertEquals(expectedNone, readAll(iter0dc4));
+ assertEquals(expectedAll, readAll(iter1dc3));
+ assertEquals(expectedAll, readAll(iter2dc3));
+ assertEquals(expectedSample, readAll(iter1dc4));
+ assertEquals(expectedSample, readAll(iter2dc4));
+
+ iter1.close();
+ iter2.close();
+ }
+ }
+
+ @Test
+ public void testInterruptingSample() throws Exception {
+ runInterruptSampleTest(false, false, false);
+ runInterruptSampleTest(false, true, false);
+ runInterruptSampleTest(true, false, false);
+ runInterruptSampleTest(true, true, false);
+ runInterruptSampleTest(true, true, true);
+ }
+
+ private void runInterruptSampleTest(boolean deepCopy, boolean delete, boolean dcAfterDelete) throws Exception {
+ SamplerConfigurationImpl sampleConfig1 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "2"));
+ Sampler sampler = SamplerFactory.newSampler(sampleConfig1, DefaultConfiguration.getInstance());
+
+ ConfigurationCopy config1 = newConfig(tempFolder.newFolder().getAbsolutePath());
+ for (Entry<String,String> entry : sampleConfig1.toTablePropertiesMap().entrySet()) {
+ config1.set(entry.getKey(), entry.getValue());
+ }
+
+ InMemoryMap imm = new InMemoryMap(config1);
+
+ TreeMap<Key,Value> expectedSample = new TreeMap<>();
+ TreeMap<Key,Value> expectedAll = new TreeMap<>();
+
+ for (int r = 0; r < 1000; r++) {
+ String row = String.format("r%06d", r);
+ mutate(imm, row, "cf1:cq1", 5, "v" + (2 * r), sampler, expectedSample, expectedAll);
+ mutate(imm, row, "cf2:cq2", 5, "v" + ((2 * r) + 1), sampler, expectedSample, expectedAll);
+ }
+
+ assertTrue(expectedSample.size() > 0);
+
+ MemoryIterator miter = imm.skvIterator(sampleConfig1);
+ AtomicBoolean iFlag = new AtomicBoolean(false);
+ miter.setInterruptFlag(iFlag);
+ SortedKeyValueIterator<Key,Value> iter = miter;
+
+ if (delete && !dcAfterDelete) {
+ imm.delete(0);
+ }
+
+ if (deepCopy) {
+ iter = iter.deepCopy(new SampleIE(sampleConfig1));
+ }
+
+ if (delete && dcAfterDelete) {
+ imm.delete(0);
+ }
+
+ assertEquals(expectedSample, readAll(iter));
+ iFlag.set(true);
+ try {
+ readAll(iter);
+ Assert.fail();
+ } catch (IterationInterruptedException iie) {}
+
+ miter.close();
+ }
+
+ private void mutate(InMemoryMap imm, String row, String cols, int ts, String val, Sampler sampler, TreeMap<Key,Value> expectedSample,
+ TreeMap<Key,Value> expectedAll) {
+ mutate(imm, row, cols, ts, val);
+ Key k1 = nk(row, cols, ts);
+ if (sampler.accept(k1)) {
+ expectedSample.put(k1, new Value(val.getBytes()));
+ }
+ expectedAll.put(k1, new Value(val.getBytes()));
+ }
+
+ @Test(expected = SampleNotPresentException.class)
+ public void testDifferentSampleConfig() throws Exception {
+ SamplerConfigurationImpl sampleConfig = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "7"));
+
+ ConfigurationCopy config1 = newConfig(tempFolder.newFolder().getAbsolutePath());
+ for (Entry<String,String> entry : sampleConfig.toTablePropertiesMap().entrySet()) {
+ config1.set(entry.getKey(), entry.getValue());
+ }
+
+ InMemoryMap imm = new InMemoryMap(config1);
+
+ mutate(imm, "r", "cf:cq", 5, "b");
+
+ SamplerConfigurationImpl sampleConfig2 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "9"));
+ MemoryIterator iter = imm.skvIterator(sampleConfig2);
+ iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ }
+
+ @Test(expected = SampleNotPresentException.class)
+ public void testNoSampleConfig() throws Exception {
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ mutate(imm, "r", "cf:cq", 5, "b");
+
+ SamplerConfigurationImpl sampleConfig2 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "9"));
+ MemoryIterator iter = imm.skvIterator(sampleConfig2);
+ iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ }
+
+ @Test
+ public void testEmptyNoSampleConfig() throws Exception {
+ InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ SamplerConfigurationImpl sampleConfig2 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "9"));
+
+ // when in mem map is empty should be able to get sample iterator with any sample config
+ MemoryIterator iter = imm.skvIterator(sampleConfig2);
+ iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ Assert.assertFalse(iter.hasTop());
+ }
+
+ @Test
+ public void testDeferredSamplerCreation() throws Exception {
+ SamplerConfigurationImpl sampleConfig1 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "9"));
+
+ ConfigurationCopy config1 = newConfig(tempFolder.newFolder().getAbsolutePath());
+ for (Entry<String,String> entry : sampleConfig1.toTablePropertiesMap().entrySet()) {
+ config1.set(entry.getKey(), entry.getValue());
+ }
+
+ InMemoryMap imm = new InMemoryMap(config1);
+
+ // change sampler config after creating in mem map.
+ SamplerConfigurationImpl sampleConfig2 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "7"));
+ for (Entry<String,String> entry : sampleConfig2.toTablePropertiesMap().entrySet()) {
+ config1.set(entry.getKey(), entry.getValue());
+ }
+
+ TreeMap<Key,Value> expectedSample = new TreeMap<>();
+ TreeMap<Key,Value> expectedAll = new TreeMap<>();
+ Sampler sampler = SamplerFactory.newSampler(sampleConfig2, config1);
+
+ for (int i = 0; i < 100; i++) {
+ mutate(imm, "r" + i, "cf:cq", 5, "v" + i, sampler, expectedSample, expectedAll);
+ }
+
+ MemoryIterator iter = imm.skvIterator(sampleConfig2);
+ iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ Assert.assertEquals(expectedSample, readAll(iter));
+
+ SortedKeyValueIterator<Key,Value> dc = iter.deepCopy(new SampleIE(sampleConfig2));
+ dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ Assert.assertEquals(expectedSample, readAll(dc));
+
+ iter = imm.skvIterator(null);
+ iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ Assert.assertEquals(expectedAll, readAll(iter));
+
+ iter = imm.skvIterator(sampleConfig1);
+ thrown.expect(SampleNotPresentException.class);
+ iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ }
+
+ private TreeMap<Key,Value> readAll(SortedKeyValueIterator<Key,Value> iter) throws IOException {
+ iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+
+ TreeMap<Key,Value> actual = new TreeMap<>();
+ while (iter.hasTop()) {
+ actual.put(iter.getTopKey(), iter.getTopValue());
+ iter.next();
+ }
+ return actual;
+ }
+
private void seekLocalityGroups(SortedKeyValueIterator<Key,Value> iter1) throws IOException {
iter1.seek(new Range(), newCFSet("cf1"), true);
ae(iter1, "r1", "cf1:x", 2, "1");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
index 55226fb..0388c1f 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.file.NoSuchMetaStoreException;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.hadoop.io.Text;
@@ -133,6 +134,11 @@ public class DefaultCompactionStrategyTest {
@Override
public void close() throws IOException {}
+ @Override
+ public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+ return null;
+ }
+
}
static final DefaultConfiguration dfault = AccumuloConfiguration.getDefaultConfiguration();