You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/08/25 18:10:04 UTC
[2/2] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT
Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT
Conflicts:
server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3d3d301f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3d3d301f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3d3d301f
Branch: refs/heads/1.6.1-SNAPSHOT
Commit: 3d3d301f4ab39efd27d1deb1df438a1df0b9c1e8
Parents: 23c74cd 30a0ca3
Author: Keith Turner <kt...@apache.org>
Authored: Mon Aug 25 11:51:38 2014 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Aug 25 11:51:38 2014 -0400
----------------------------------------------------------------------
.../system/SourceSwitchingIterator.java | 2 +-
.../apache/accumulo/tserver/InMemoryMap.java | 61 +++++++++++++-------
.../accumulo/tserver/InMemoryMapTest.java | 44 ++++++++++++++
3 files changed, 85 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3d3d301f/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index dc36718,0000000..5f6d9ce
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@@ -1,753 -1,0 +1,772 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+class MemKeyComparator implements Comparator<Key>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int compare(Key k1, Key k2) {
+ int cmp = k1.compareTo(k2);
+
+ if (cmp == 0) {
+ if (k1 instanceof MemKey)
+ if (k2 instanceof MemKey)
+ cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
+ else
+ cmp = 1;
+ else if (k2 instanceof MemKey)
+ cmp = -1;
+ }
+
+ return cmp;
+ }
+}
+
+class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator {
+
+ int kvCount;
+
+ public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) {
+ setSource(source);
+ this.kvCount = maxKVCount;
+ }
+
+ @Override
+ protected void consume() throws IOException {
+ while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
+ getSource().next();
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {
+ ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+ }
+
+}
+
+class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator {
+ MemKey currKey = null;
+ Value currVal = null;
+
+ public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source) {
+ super();
+ setSource(source);
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new MemKeyConversionIterator(getSource().deepCopy(env));
+ }
+
+ @Override
+ public Key getTopKey() {
+ return currKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return currVal;
+ }
+
+ private void getTopKeyVal() {
+ Key k = super.getTopKey();
+ Value v = super.getTopValue();
+ if (k instanceof MemKey || k == null) {
+ currKey = (MemKey) k;
+ currVal = v;
+ return;
+ }
+ currVal = new Value(v);
+ int mc = MemValue.splitKVCount(currVal);
+ currKey = new MemKey(k, mc);
+
+ }
+
+ public void next() throws IOException {
+ super.next();
+ if (hasTop())
+ getTopKeyVal();
+ }
+
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ super.seek(range, columnFamilies, inclusive);
+
+ if (hasTop())
+ getTopKeyVal();
+
+ Key k = range.getStartKey();
+ if (k instanceof MemKey && hasTop()) {
+ while (hasTop() && currKey.compareTo(k) < 0)
+ next();
+ }
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {
+ ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+ }
+
+}
+
+public class InMemoryMap {
+ private SimpleMap map = null;
+
+ private static final Logger log = Logger.getLogger(InMemoryMap.class);
+
+ private volatile String memDumpFile = null;
+ private final String memDumpDir;
+
+ private Map<String,Set<ByteSequence>> lggroups;
+
+ public InMemoryMap(boolean useNativeMap, String memDumpDir) {
+ this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
+ }
+
+ public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
+ this.memDumpDir = memDumpDir;
+ this.lggroups = lggroups;
+
+ if (lggroups.size() == 0)
+ map = newMap(useNativeMap);
+ else
+ map = new LocalityGroupMap(lggroups, useNativeMap);
+ }
+
+ public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
+ this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
+ }
+
+ private static SimpleMap newMap(boolean useNativeMap) {
+ if (useNativeMap && NativeMap.isLoaded()) {
+ try {
+ return new NativeMapWrapper();
+ } catch (Throwable t) {
+ log.error("Failed to create native map", t);
+ }
+ }
+
+ return new DefaultMap();
+ }
+
+ private interface SimpleMap {
+ Value get(Key key);
+
+ Iterator<Entry<Key,Value>> iterator(Key startKey);
+
+ int size();
+
+ InterruptibleIterator skvIterator();
+
+ void delete();
+
+ long getMemoryUsed();
+
+ void mutate(List<Mutation> mutations, int kvCount);
+ }
+
+ private static class LocalityGroupMap implements SimpleMap {
+
+ private Map<ByteSequence,MutableLong> groupFams[];
+
+ // the last map in the array is the default locality group
+ private SimpleMap maps[];
+ private Partitioner partitioner;
+ private List<Mutation>[] partitioned;
+ private Set<ByteSequence> nonDefaultColumnFamilies;
+
+ @SuppressWarnings("unchecked")
+ LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) {
+ this.groupFams = new Map[groups.size()];
+ this.maps = new SimpleMap[groups.size() + 1];
+ this.partitioned = new List[groups.size() + 1];
+ this.nonDefaultColumnFamilies = new HashSet<ByteSequence>();
+
+ for (int i = 0; i < maps.length; i++) {
+ maps[i] = newMap(useNativeMap);
+ }
+
+ int count = 0;
+ for (Set<ByteSequence> cfset : groups.values()) {
+ HashMap<ByteSequence,MutableLong> map = new HashMap<ByteSequence,MutableLong>();
+ for (ByteSequence bs : cfset)
+ map.put(bs, new MutableLong(1));
+ this.groupFams[count++] = map;
+ nonDefaultColumnFamilies.addAll(cfset);
+ }
+
+ partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
+
+ for (int i = 0; i < partitioned.length; i++) {
+ partitioned[i] = new ArrayList<Mutation>();
+ }
+ }
+
+ @Override
+ public Value get(Key key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ int sum = 0;
+ for (SimpleMap map : maps)
+ sum += map.size();
+ return sum;
+ }
+
+ @Override
+ public InterruptibleIterator skvIterator() {
+ LocalityGroup groups[] = new LocalityGroup[maps.length];
+ for (int i = 0; i < groups.length; i++) {
+ if (i < groupFams.length)
+ groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false);
+ else
+ groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
+ }
+
+
+ return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
+ }
+
+ @Override
+ public void delete() {
+ for (SimpleMap map : maps)
+ map.delete();
+ }
+
+ @Override
+ public long getMemoryUsed() {
+ long sum = 0;
+ for (SimpleMap map : maps)
+ sum += map.getMemoryUsed();
+ return sum;
+ }
+
+ @Override
+ public synchronized void mutate(List<Mutation> mutations, int kvCount) {
+ // this method is synchronized because it reuses objects to avoid allocation,
+ // currently, the method that calls this is synchronized so there is no
+ // loss in parallelism.... synchronization was added here for future proofing
+
+ try{
+ partitioner.partition(mutations, partitioned);
+
+ for (int i = 0; i < partitioned.length; i++) {
+ if (partitioned[i].size() > 0) {
+ maps[i].mutate(partitioned[i], kvCount);
+ for (Mutation m : partitioned[i])
+ kvCount += m.getUpdates().size();
+ }
+ }
+ } finally {
+ // clear immediately so mutations can be garbage collected
+ for (List<Mutation> list : partitioned) {
+ list.clear();
+ }
+ }
+ }
+
+ }
+
+ private static class DefaultMap implements SimpleMap {
+ private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator());
+ private AtomicLong bytesInMemory = new AtomicLong();
+ private AtomicInteger size = new AtomicInteger();
+
+ public void put(Key key, Value value) {
+ // Always a MemKey, so account for the kvCount int
+ bytesInMemory.addAndGet(key.getLength() + 4);
+ bytesInMemory.addAndGet(value.getSize());
+ if (map.put(key, value) == null)
+ size.incrementAndGet();
+ }
+
+ public Value get(Key key) {
+ return map.get(key);
+ }
+
+ public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+ Key lk = new Key(startKey);
+ SortedMap<Key,Value> tm = map.tailMap(lk);
+ return tm.entrySet().iterator();
+ }
+
+ public int size() {
+ return size.get();
+ }
+
+ public synchronized InterruptibleIterator skvIterator() {
+ if (map == null)
+ throw new IllegalStateException();
+
+ return new SortedMapIterator(map);
+ }
+
+ public synchronized void delete() {
+ map = null;
+ }
+
+ public long getOverheadPerEntry() {
+ // all of the java objects that are used to hold the
+ // data and make it searchable have overhead... this
+ // overhead is estimated using test.EstimateInMemMapOverhead
+ // and is in bytes.. the estimates were obtained by running
+ // java 6_16 in 64 bit server mode
+
+ return 200;
+ }
+
+ @Override
+ public void mutate(List<Mutation> mutations, int kvCount) {
+ for (Mutation m : mutations) {
+ for (ColumnUpdate cvp : m.getUpdates()) {
+ Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(),
+ false, kvCount++);
+ Value value = new Value(cvp.getValue());
+ put(newKey, value);
+ }
+ }
+ }
+
+ @Override
+ public long getMemoryUsed() {
+ return bytesInMemory.get() + (size() * getOverheadPerEntry());
+ }
+ }
+
+ private static class NativeMapWrapper implements SimpleMap {
+ private NativeMap nativeMap;
+
+ NativeMapWrapper() {
+ nativeMap = new NativeMap();
+ }
+
+ public Value get(Key key) {
+ return nativeMap.get(key);
+ }
+
+ public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+ return nativeMap.iterator(startKey);
+ }
+
+ public int size() {
+ return nativeMap.size();
+ }
+
+ public InterruptibleIterator skvIterator() {
+ return (InterruptibleIterator) nativeMap.skvIterator();
+ }
+
+ public void delete() {
+ nativeMap.delete();
+ }
+
+ public long getMemoryUsed() {
+ return nativeMap.getMemoryUsed();
+ }
+
+ @Override
+ public void mutate(List<Mutation> mutations, int kvCount) {
+ nativeMap.mutate(mutations, kvCount);
+ }
+ }
+
+ private AtomicInteger nextKVCount = new AtomicInteger(1);
+ private AtomicInteger kvCount = new AtomicInteger(0);
+
+ private Object writeSerializer = new Object();
+
+ /**
+ * Applies changes to a row in the InMemoryMap
+ *
+ */
+ public void mutate(List<Mutation> mutations) {
+ int numKVs = 0;
+ for (int i = 0; i < mutations.size(); i++)
+ numKVs += mutations.get(i).size();
+
+ // Can not update mutationCount while writes that started before
+ // are in progress, this would cause partial mutations to be seen.
+ // Also, can not continue until mutation count is updated, because
+ // a read may not see a successful write. Therefore writes must
+ // wait for writes that started before to finish.
+ //
+ // using separate lock from this map, to allow read/write in parallel
+ synchronized (writeSerializer ) {
+ int kv = nextKVCount.getAndAdd(numKVs);
+ try {
+ map.mutate(mutations, kv);
+ } finally {
+ kvCount.set(kv + numKVs - 1);
+ }
+ }
+ }
+
+ /**
+ * Returns a long representing the size of the InMemoryMap
+ *
+ * @return bytesInMemory
+ */
+ public synchronized long estimatedSizeInBytes() {
+ if (map == null)
+ return 0;
+
+ return map.getMemoryUsed();
+ }
+
+ Iterator<Map.Entry<Key,Value>> iterator(Key startKey) {
+ return map.iterator(startKey);
+ }
+
+ public long getNumEntries() {
+ return map.size();
+ }
+
+ private final Set<MemoryIterator> activeIters = Collections.synchronizedSet(new HashSet<MemoryIterator>());
+
+ class MemoryDataSource implements DataSource {
+
+ boolean switched = false;
+ private InterruptibleIterator iter;
- private List<FileSKVIterator> readers;
++ private FileSKVIterator reader;
++ private MemoryDataSource parent;
++ private IteratorEnvironment env;
+
+ MemoryDataSource() {
- this(new ArrayList<FileSKVIterator>());
++ this(null, false, null);
+ }
+
- public MemoryDataSource(List<FileSKVIterator> readers) {
- this.readers = readers;
++ public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env) {
++ this.parent = parent;
++ this.switched = switched;
++ this.env = env;
+ }
+
+ @Override
+ public boolean isCurrent() {
+ if (switched)
+ return true;
+ else
+ return memDumpFile == null;
+ }
+
+ @Override
+ public DataSource getNewDataSource() {
+ if (switched)
+ throw new IllegalStateException();
+
+ if (!isCurrent()) {
+ switched = true;
+ iter = null;
++ try {
++ // ensure files are referenced even if iterator was never seeked before
++ iterator();
++ } catch (IOException e) {
++ throw new RuntimeException();
++ }
+ }
+
+ return this;
+ }
+
++ private synchronized FileSKVIterator getReader() throws IOException {
++ if (reader == null) {
++ Configuration conf = CachedConfiguration.getInstance();
++ FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
++
++ reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
++ }
++
++ return reader;
++ }
++
+ @Override
+ public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
+ if (iter == null)
+ if (!switched)
+ iter = map.skvIterator();
+ else {
-
- Configuration conf = CachedConfiguration.getInstance();
- FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
-
- FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
-
- readers.add(reader);
-
- iter = new MemKeyConversionIterator(reader);
++ if (parent == null)
++ iter = new MemKeyConversionIterator(getReader());
++ else
++ synchronized (parent) {
++ // synchronize deep copy operation on parent, this prevents multiple threads from deep copying the rfile shared from parent its possible that the
++ // thread deleting an InMemoryMap and scan thread could be switching different deep copies
++ iter = new MemKeyConversionIterator(parent.getReader().deepCopy(env));
++ }
+ }
+
+ return iter;
+ }
+
+ @Override
+ public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
- return new MemoryDataSource(readers);
++ return new MemoryDataSource(parent == null ? this : parent, switched, env);
+ }
+
+ }
+
+ class MemoryIterator extends WrappingIterator implements InterruptibleIterator {
+
+ private AtomicBoolean closed;
+ private SourceSwitchingIterator ssi;
+ private MemoryDataSource mds;
+
+ protected SortedKeyValueIterator<Key,Value> getSource() {
+ if (closed.get())
+ throw new IllegalStateException("Memory iterator is closed");
+ return super.getSource();
+ }
+
+ private MemoryIterator(InterruptibleIterator source) {
+ this(source, new AtomicBoolean(false));
+ }
+
+ private MemoryIterator(SortedKeyValueIterator<Key,Value> source, AtomicBoolean closed) {
+ setSource(source);
+ this.closed = closed;
+ }
+
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new MemoryIterator(getSource().deepCopy(env), closed);
+ }
+
+ public void close() {
+
+ synchronized (this) {
+ if (closed.compareAndSet(false, true)) {
-
- for (FileSKVIterator reader : mds.readers)
- try {
- reader.close();
- } catch (IOException e) {
- log.warn(e, e);
- }
++ try {
++ if (mds.reader != null)
++ mds.reader.close();
++ } catch (IOException e) {
++ log.warn(e, e);
++ }
+ }
+ }
+
+ // remove outside of sync to avoid deadlock
+ activeIters.remove(this);
+ }
+
+ private synchronized boolean switchNow() throws IOException {
+ if (closed.get())
+ return false;
+
+ ssi.switchNow();
+ return true;
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {
+ ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+ }
+
+ private void setSSI(SourceSwitchingIterator ssi) {
+ this.ssi = ssi;
+ }
+
+ public void setMDS(MemoryDataSource mds) {
+ this.mds = mds;
+ }
+
+ }
+
+ public synchronized MemoryIterator skvIterator() {
+ if (map == null)
+ throw new NullPointerException();
+
+ if (deleted)
+ throw new IllegalStateException("Can not obtain iterator after map deleted");
+
+ int mc = kvCount.get();
+ MemoryDataSource mds = new MemoryDataSource();
+ SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
+ MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc));
+ mi.setSSI(ssi);
+ mi.setMDS(mds);
+ activeIters.add(mi);
+ return mi;
+ }
+
+ public SortedKeyValueIterator<Key,Value> compactionIterator() {
+
+ if (nextKVCount.get() - 1 != kvCount.get())
+ throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = "
+ + kvCount.get());
+
+ return map.skvIterator();
+ }
+
+ private boolean deleted = false;
+
+ public void delete(long waitTime) {
+
+ synchronized (this) {
+ if (deleted)
+ throw new IllegalStateException("Double delete");
+
+ deleted = true;
+ }
+
+ long t1 = System.currentTimeMillis();
+
+ while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime) {
+ UtilWaitThread.sleep(50);
+ }
+
+ if (activeIters.size() > 0) {
+ // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file
+ try {
+ Configuration conf = CachedConfiguration.getInstance();
+ FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+
+ String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION;
+
+ Configuration newConf = new Configuration(conf);
+ newConf.setInt("io.seqfile.compress.blocksize", 100000);
+
+ FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
+
+ InterruptibleIterator iter = map.skvIterator();
+
+ HashSet<ByteSequence> allfams= new HashSet<ByteSequence>();
+
+ for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){
+ allfams.addAll(entry.getValue());
+ out.startNewLocalityGroup(entry.getKey(), entry.getValue());
+ iter.seek(new Range(), entry.getValue(), true);
+ dumpLocalityGroup(out, iter);
+ }
+
+ out.startDefaultLocalityGroup();
+ iter.seek(new Range(), allfams, false);
+
+ dumpLocalityGroup(out, iter);
+
+ out.close();
+
+ log.debug("Created mem dump file " + tmpFile);
+
+ memDumpFile = tmpFile;
+
+ synchronized (activeIters) {
+ for (MemoryIterator mi : activeIters) {
+ mi.switchNow();
+ }
+ }
+
+ // rely on unix behavior that file will be deleted when last
+ // reader closes it
+ fs.delete(new Path(memDumpFile), true);
+
+ } catch (IOException ioe) {
+ log.error("Failed to create mem dump file ", ioe);
+
+ while (activeIters.size() > 0) {
+ UtilWaitThread.sleep(100);
+ }
+ }
+
+ }
+
+ SimpleMap tmpMap = map;
+
+ synchronized (this) {
+ map = null;
+ }
+
+ tmpMap.delete();
+ }
+
+ private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException {
+ while (iter.hasTop() && activeIters.size() > 0) {
+ // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
+ // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
+ Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
+ out.append(iter.getTopKey(), newValue);
+ iter.next();
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3d3d301f/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
----------------------------------------------------------------------
diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
index dc7ee99,0000000..3932552
mode 100644,000000..100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
@@@ -1,513 -1,0 +1,557 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ZooConfiguration;
+import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class InMemoryMapTest {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // suppress log messages having to do with not having an instance
+ Logger.getLogger(ZooConfiguration.class).setLevel(Level.OFF);
+ Logger.getLogger(HdfsZooInstance.class).setLevel(Level.OFF);
+ }
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+ public void mutate(InMemoryMap imm, String row, String column, long ts) {
+ Mutation m = new Mutation(new Text(row));
+ String[] sa = column.split(":");
+ m.putDelete(new Text(sa[0]), new Text(sa[1]), ts);
+
+ imm.mutate(Collections.singletonList(m));
+ }
+
+ public void mutate(InMemoryMap imm, String row, String column, long ts, String value) {
+ Mutation m = new Mutation(new Text(row));
+ String[] sa = column.split(":");
+ m.put(new Text(sa[0]), new Text(sa[1]), ts, new Value(value.getBytes()));
+
+ imm.mutate(Collections.singletonList(m));
+ }
+
+ static Key nk(String row, String column, long ts) {
+ String[] sa = column.split(":");
+ Key k = new Key(new Text(row), new Text(sa[0]), new Text(sa[1]), ts);
+ return k;
+ }
+
+ static void ae(SortedKeyValueIterator<Key,Value> dc, String row, String column, int ts, String val) throws IOException {
+ assertTrue(dc.hasTop());
+ assertEquals(nk(row, column, ts), dc.getTopKey());
+ assertEquals(new Value(val.getBytes()), dc.getTopValue());
+ dc.next();
+
+ }
+
+ static Set<ByteSequence> newCFSet(String... cfs) {
+ HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>();
+ for (String cf : cfs) {
+ cfSet.add(new ArrayByteSequence(cf));
+ }
+ return cfSet;
+ }
+
+ @Test
+ public void test2() throws Exception {
+ InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ MemoryIterator ski1 = imm.skvIterator();
+ mutate(imm, "r1", "foo:cq1", 3, "bar1");
+ MemoryIterator ski2 = imm.skvIterator();
+
+ ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ assertFalse(ski1.hasTop());
+
+ ski2.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ assertTrue(ski2.hasTop());
+ ae(ski2, "r1", "foo:cq1", 3, "bar1");
+ assertFalse(ski2.hasTop());
+
+ }
+
+ @Test
+ public void test3() throws Exception {
+ InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ mutate(imm, "r1", "foo:cq1", 3, "bar1");
+ mutate(imm, "r1", "foo:cq1", 3, "bar2");
+ MemoryIterator ski1 = imm.skvIterator();
+ mutate(imm, "r1", "foo:cq1", 3, "bar3");
+
+ mutate(imm, "r3", "foo:cq1", 3, "bar9");
+ mutate(imm, "r3", "foo:cq1", 3, "bara");
+
+ MemoryIterator ski2 = imm.skvIterator();
+
+ ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(ski1, "r1", "foo:cq1", 3, "bar2");
+ ae(ski1, "r1", "foo:cq1", 3, "bar1");
+ assertFalse(ski1.hasTop());
+
+ ski2.seek(new Range(new Text("r3")), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(ski2, "r3", "foo:cq1", 3, "bara");
+ ae(ski2, "r3", "foo:cq1", 3, "bar9");
+ assertFalse(ski1.hasTop());
+
+ }
+
+ @Test
+ public void test4() throws Exception {
+ InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ mutate(imm, "r1", "foo:cq1", 3, "bar1");
+ mutate(imm, "r1", "foo:cq1", 3, "bar2");
+ MemoryIterator ski1 = imm.skvIterator();
+ mutate(imm, "r1", "foo:cq1", 3, "bar3");
+
+ imm.delete(0);
+
+ ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(ski1, "r1", "foo:cq1", 3, "bar2");
+ ae(ski1, "r1", "foo:cq1", 3, "bar1");
+ assertFalse(ski1.hasTop());
+
+ ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(ski1, "r1", "foo:cq1", 3, "bar2");
+ ae(ski1, "r1", "foo:cq1", 3, "bar1");
+ assertFalse(ski1.hasTop());
+
+ ski1.seek(new Range(new Text("r2")), LocalityGroupUtil.EMPTY_CF_SET, false);
+ assertFalse(ski1.hasTop());
+
+ ski1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(ski1, "r1", "foo:cq1", 3, "bar2");
+ ae(ski1, "r1", "foo:cq1", 3, "bar1");
+ assertFalse(ski1.hasTop());
+
+ ski1.close();
+ }
+
+ @Test
+ public void test5() throws Exception {
+ InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ mutate(imm, "r1", "foo:cq1", 3, "bar1");
+ mutate(imm, "r1", "foo:cq1", 3, "bar2");
+ mutate(imm, "r1", "foo:cq1", 3, "bar3");
+
+ MemoryIterator ski1 = imm.skvIterator();
+ ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(ski1, "r1", "foo:cq1", 3, "bar3");
+
+ imm.delete(0);
+
+ ae(ski1, "r1", "foo:cq1", 3, "bar2");
+ ae(ski1, "r1", "foo:cq1", 3, "bar1");
+ assertFalse(ski1.hasTop());
+
+ ski1.close();
+
+ imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ mutate(imm, "r1", "foo:cq1", 3, "bar1");
+ mutate(imm, "r1", "foo:cq2", 3, "bar2");
+ mutate(imm, "r1", "foo:cq3", 3, "bar3");
+
+ ski1 = imm.skvIterator();
+ ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(ski1, "r1", "foo:cq1", 3, "bar1");
+
+ imm.delete(0);
+
+ ae(ski1, "r1", "foo:cq2", 3, "bar2");
+ ae(ski1, "r1", "foo:cq3", 3, "bar3");
+ assertFalse(ski1.hasTop());
+
+ ski1.close();
+ }
+
+ @Test
+ public void test6() throws Exception {
+ InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ mutate(imm, "r1", "foo:cq1", 3, "bar1");
+ mutate(imm, "r1", "foo:cq2", 3, "bar2");
+ mutate(imm, "r1", "foo:cq3", 3, "bar3");
+ mutate(imm, "r1", "foo:cq4", 3, "bar4");
+
+ MemoryIterator ski1 = imm.skvIterator();
+
+ mutate(imm, "r1", "foo:cq5", 3, "bar5");
+
+ SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null);
+
+ ski1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(ski1, "r1", "foo:cq1", 3, "bar1");
+
+ dc.seek(new Range(nk("r1", "foo:cq2", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(dc, "r1", "foo:cq2", 3, "bar2");
+
+ imm.delete(0);
+
+ ae(ski1, "r1", "foo:cq2", 3, "bar2");
+ ae(dc, "r1", "foo:cq3", 3, "bar3");
+ ae(ski1, "r1", "foo:cq3", 3, "bar3");
+ ae(dc, "r1", "foo:cq4", 3, "bar4");
+ ae(ski1, "r1", "foo:cq4", 3, "bar4");
+ assertFalse(ski1.hasTop());
+ assertFalse(dc.hasTop());
+
+ ski1.seek(new Range(nk("r1", "foo:cq3", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+
+ dc.seek(new Range(nk("r1", "foo:cq4", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(dc, "r1", "foo:cq4", 3, "bar4");
+ assertFalse(dc.hasTop());
+
+ ae(ski1, "r1", "foo:cq3", 3, "bar3");
+ ae(ski1, "r1", "foo:cq4", 3, "bar4");
+ assertFalse(ski1.hasTop());
+ assertFalse(dc.hasTop());
+
+ ski1.close();
+ }
+
++ private void deepCopyAndDelete(int interleaving) throws Exception {
++ // interleaving == 0 intentionally omitted, this runs the test w/o deleting in mem map
++
++ InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
++
++ mutate(imm, "r1", "foo:cq1", 3, "bar1");
++ mutate(imm, "r1", "foo:cq2", 3, "bar2");
++
++ MemoryIterator ski1 = imm.skvIterator();
++
++ if (interleaving == 1)
++ imm.delete(0);
++
++ SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null);
++
++ if (interleaving == 2)
++ imm.delete(0);
++
++ dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
++ ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
++
++ if (interleaving == 3)
++ imm.delete(0);
++
++ ae(dc, "r1", "foo:cq1", 3, "bar1");
++ ae(ski1, "r1", "foo:cq1", 3, "bar1");
++ dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
++
++ if (interleaving == 4)
++ imm.delete(0);
++
++ ae(ski1, "r1", "foo:cq2", 3, "bar2");
++ ae(dc, "r1", "foo:cq1", 3, "bar1");
++ ae(dc, "r1", "foo:cq2", 3, "bar2");
++ assertFalse(dc.hasTop());
++ assertFalse(ski1.hasTop());
++ }
++
++ @Test
++ public void testDeepCopyAndDelete() throws Exception {
++ for (int i = 0; i <= 4; i++)
++ deepCopyAndDelete(i);
++ }
++
+ @Test
+ public void testBug1() throws Exception {
+ InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ for (int i = 0; i < 20; i++) {
+ mutate(imm, "r1", "foo:cq" + i, 3, "bar" + i);
+ }
+
+ for (int i = 0; i < 20; i++) {
+ mutate(imm, "r2", "foo:cq" + i, 3, "bar" + i);
+ }
+
+ MemoryIterator ski1 = imm.skvIterator();
+ ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(ski1);
+
+ imm.delete(0);
+
+ ArrayList<ByteSequence> columns = new ArrayList<ByteSequence>();
+ columns.add(new ArrayByteSequence("bar"));
+
+ // this seek resulted in an infinite loop before a bug was fixed
+ cfsi.seek(new Range("r1"), columns, true);
+
+ assertFalse(cfsi.hasTop());
+
+ ski1.close();
+ }
+
+ @Test
+ public void testSeekBackWards() throws Exception {
+ InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ mutate(imm, "r1", "foo:cq1", 3, "bar1");
+ mutate(imm, "r1", "foo:cq2", 3, "bar2");
+ mutate(imm, "r1", "foo:cq3", 3, "bar3");
+ mutate(imm, "r1", "foo:cq4", 3, "bar4");
+
+ MemoryIterator skvi1 = imm.skvIterator();
+
+ skvi1.seek(new Range(nk("r1", "foo:cq3", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(skvi1, "r1", "foo:cq3", 3, "bar3");
+
+ skvi1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(skvi1, "r1", "foo:cq1", 3, "bar1");
+
+ }
+
+ @Test
+ public void testDuplicateKey() throws Exception {
+ InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+
+ Mutation m = new Mutation(new Text("r1"));
+ m.put(new Text("foo"), new Text("cq"), 3, new Value("v1".getBytes()));
+ m.put(new Text("foo"), new Text("cq"), 3, new Value("v2".getBytes()));
+ imm.mutate(Collections.singletonList(m));
+
+ MemoryIterator skvi1 = imm.skvIterator();
+ skvi1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ae(skvi1, "r1", "foo:cq", 3, "v2");
+ ae(skvi1, "r1", "foo:cq", 3, "v1");
+ }
+
+ private static final Logger log = Logger.getLogger(InMemoryMapTest.class);
+
+ static long sum(long[] counts) {
+ long result = 0;
+ for (int i = 0; i < counts.length; i++)
+ result += counts[i];
+ return result;
+ }
+
+ // - hard to get this timing test to run well on apache build machines
+ @Test
+ @Ignore
+ public void parallelWriteSpeed() throws InterruptedException, IOException {
+ List<Double> timings = new ArrayList<Double>();
+ for (int threads : new int[] {1, 2, 16, /* 64, 256 */}) {
+ final long now = System.currentTimeMillis();
+ final long counts[] = new long[threads];
+ final InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+ ExecutorService e = Executors.newFixedThreadPool(threads);
+ for (int j = 0; j < threads; j++) {
+ final int threadId = j;
+ e.execute(new Runnable() {
+ @Override
+ public void run() {
+ while (System.currentTimeMillis() - now < 1000) {
+ for (int k = 0; k < 1000; k++) {
+ Mutation m = new Mutation("row");
+ m.put("cf", "cq", new Value("v".getBytes()));
+ List<Mutation> mutations = Collections.singletonList(m);
+ imm.mutate(mutations);
+ counts[threadId]++;
+ }
+ }
+ }
+ });
+ }
+ e.shutdown();
+ e.awaitTermination(10, TimeUnit.SECONDS);
+ imm.delete(10000);
+ double mutationsPerSecond = sum(counts) / ((System.currentTimeMillis() - now) / 1000.);
+ timings.add(mutationsPerSecond);
+ log.info(String.format("%.1f mutations per second with %d threads", mutationsPerSecond, threads));
+ }
+ // verify that more threads doesn't go a lot faster, or a lot slower than one thread
+ for (int i = 0; i < timings.size(); i++) {
+ double ratioFirst = timings.get(0) / timings.get(i);
+ assertTrue(ratioFirst < 3);
+ assertTrue(ratioFirst > 0.3);
+ }
+ }
+
+ @Test
+ public void testLocalityGroups() throws Exception {
+
+ Map<String,Set<ByteSequence>> lggroups1 = new HashMap<String,Set<ByteSequence>>();
+ lggroups1.put("lg1", newCFSet("cf1", "cf2"));
+ lggroups1.put("lg2", newCFSet("cf3", "cf4"));
+
+ InMemoryMap imm = new InMemoryMap(lggroups1, false, tempFolder.newFolder().getAbsolutePath());
+
+ Mutation m1 = new Mutation("r1");
+ m1.put("cf1", "x", 2, "1");
+ m1.put("cf1", "y", 2, "2");
+ m1.put("cf3", "z", 2, "3");
+ m1.put("foo", "b", 2, "9");
+
+ Mutation m2 = new Mutation("r2");
+ m2.put("cf2", "x", 3, "5");
+
+ Mutation m3 = new Mutation("r3");
+ m3.put("foo", "b", 4, "6");
+
+ Mutation m4 = new Mutation("r4");
+ m4.put("foo", "b", 5, "7");
+ m4.put("cf4", "z", 5, "8");
+
+ Mutation m5 = new Mutation("r5");
+ m5.put("cf3", "z", 6, "A");
+ m5.put("cf4", "z", 6, "B");
+
+ imm.mutate(Arrays.asList(m1, m2, m3, m4, m5));
+
+ MemoryIterator iter1 = imm.skvIterator();
+
+ seekLocalityGroups(iter1);
+ SortedKeyValueIterator<Key,Value> dc1 = iter1.deepCopy(null);
+ seekLocalityGroups(dc1);
+
+ assertTrue(imm.getNumEntries() == 10);
+ assertTrue(imm.estimatedSizeInBytes() > 0);
+
+ imm.delete(0);
+
+ seekLocalityGroups(iter1);
+ seekLocalityGroups(dc1);
+ // TODO uncomment following when ACCUMULO-1628 is fixed
+ // seekLocalityGroups(iter1.deepCopy(null));
+ }
+
+ private void seekLocalityGroups(SortedKeyValueIterator<Key,Value> iter1) throws IOException {
+ iter1.seek(new Range(), newCFSet("cf1"), true);
+ ae(iter1, "r1", "cf1:x", 2, "1");
+ ae(iter1, "r1", "cf1:y", 2, "2");
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range("r2", "r4"), newCFSet("cf1"), true);
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range(), newCFSet("cf3"), true);
+ ae(iter1, "r1", "cf3:z", 2, "3");
+ ae(iter1, "r4", "cf4:z", 5, "8");
+ ae(iter1, "r5", "cf3:z", 6, "A");
+ ae(iter1, "r5", "cf4:z", 6, "B");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range(), newCFSet("foo"), true);
+ ae(iter1, "r1", "foo:b", 2, "9");
+ ae(iter1, "r3", "foo:b", 4, "6");
+ ae(iter1, "r4", "foo:b", 5, "7");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range(), newCFSet("cf1", "cf3"), true);
+ ae(iter1, "r1", "cf1:x", 2, "1");
+ ae(iter1, "r1", "cf1:y", 2, "2");
+ ae(iter1, "r1", "cf3:z", 2, "3");
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ ae(iter1, "r4", "cf4:z", 5, "8");
+ ae(iter1, "r5", "cf3:z", 6, "A");
+ ae(iter1, "r5", "cf4:z", 6, "B");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range("r2", "r4"), newCFSet("cf1", "cf3"), true);
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ ae(iter1, "r4", "cf4:z", 5, "8");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range(), newCFSet("cf1", "cf3", "foo"), true);
+ assertAll(iter1);
+
+ iter1.seek(new Range("r1", "r2"), newCFSet("cf1", "cf3", "foo"), true);
+ ae(iter1, "r1", "cf1:x", 2, "1");
+ ae(iter1, "r1", "cf1:y", 2, "2");
+ ae(iter1, "r1", "cf3:z", 2, "3");
+ ae(iter1, "r1", "foo:b", 2, "9");
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ assertAll(iter1);
+
+ iter1.seek(new Range(), newCFSet("cf1"), false);
+ assertAll(iter1);
+
+ iter1.seek(new Range(), newCFSet("cf1", "cf2"), false);
+ ae(iter1, "r1", "cf3:z", 2, "3");
+ ae(iter1, "r1", "foo:b", 2, "9");
+ ae(iter1, "r3", "foo:b", 4, "6");
+ ae(iter1, "r4", "cf4:z", 5, "8");
+ ae(iter1, "r4", "foo:b", 5, "7");
+ ae(iter1, "r5", "cf3:z", 6, "A");
+ ae(iter1, "r5", "cf4:z", 6, "B");
+ assertFalse(iter1.hasTop());
+
+ iter1.seek(new Range("r2"), newCFSet("cf1", "cf3", "foo"), true);
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ assertFalse(iter1.hasTop());
+ }
+
+ private void assertAll(SortedKeyValueIterator<Key,Value> iter1) throws IOException {
+ ae(iter1, "r1", "cf1:x", 2, "1");
+ ae(iter1, "r1", "cf1:y", 2, "2");
+ ae(iter1, "r1", "cf3:z", 2, "3");
+ ae(iter1, "r1", "foo:b", 2, "9");
+ ae(iter1, "r2", "cf2:x", 3, "5");
+ ae(iter1, "r3", "foo:b", 4, "6");
+ ae(iter1, "r4", "cf4:z", 5, "8");
+ ae(iter1, "r4", "foo:b", 5, "7");
+ ae(iter1, "r5", "cf3:z", 6, "A");
+ ae(iter1, "r5", "cf4:z", 6, "B");
+ assertFalse(iter1.hasTop());
+ }
+}