You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2017/04/13 18:13:41 UTC

[2/6] accumulo git commit: ACCUMULO-3208 Integration test for the OrIterator and cleanup

ACCUMULO-3208 Integration test for the OrIterator and cleanup

The OrIterator was in very bad shape, with next-to-no documentation
about what it actually does.

Closes apache/accumulo#247


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5ac15742
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5ac15742
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5ac15742

Branch: refs/heads/1.8
Commit: 5ac1574243f4445399dbc76da9392fb393f7f69e
Parents: 8c0f03a
Author: Josh Elser <el...@apache.org>
Authored: Sun Apr 9 22:45:56 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Apr 13 12:42:38 2017 -0400

----------------------------------------------------------------------
 .../accumulo/core/iterators/OrIterator.java     | 263 ++++++++-----
 .../org/apache/accumulo/test/OrIteratorIT.java  | 389 +++++++++++++++++++
 2 files changed, 551 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5ac15742/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
index 43ed5ed..c75bd54 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
@@ -18,9 +18,12 @@ package org.apache.accumulo.core.iterators;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.PriorityQueue;
@@ -30,36 +33,78 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * An iterator that handles "OR" query constructs on the server side. This code has been adapted/merged from Heap and Multi Iterators.
+ * An iterator that provides a sorted-iteration of column qualifiers for a set of column families in a row. It is important to note that this iterator
+ * <em>does not</em> adhere to the contract set forth by the {@link SortedKeyValueIterator}. It returns Keys in {@code row+colqual} order instead of
+ * {@code row+colfam+colqual} order. This is required for the implementation of this iterator (to work in conjunction with the {@code IntersectingIterator}) but
+ * is a code-smell. This iterator should only be used at query time, never at compaction time.
+ *
+ * The table structure should have the following form:
+ *
+ * <pre>
+ * row term:docId =&gt; value
+ * </pre>
+ *
+ * Users configuring this iterator must set the option {@link #COLUMNS_KEY}. This value is a comma-separated list of column families that should be "OR"'ed
+ * together.
+ *
+ * For example, given the following data and a value of {@code or.iterator.columns="steve,bob"} in the iterator options map:
+ *
+ * <pre>
+ * row1 bob:4
+ * row1 george:2
+ * row1 steve:3
+ * row2 bob:9
+ * row2 frank:8
+ * row2 steve:12
+ * row3 michael:15
+ * row3 steve:20
+ * </pre>
+ *
+ * Would return:
+ *
+ * <pre>
+ * row1 steve:3
+ * row1 bob:4
+ * row2 bob:9
+ * row2 steve:12
+ * row3 steve:20
+ * </pre>
  */
 
-public class OrIterator implements SortedKeyValueIterator<Key,Value> {
+public class OrIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+  private static final Logger LOG = LoggerFactory.getLogger(OrIterator.class);
+  public static final String COLUMNS_KEY = "or.iterator.columns";
 
   private TermSource currentTerm;
-  private ArrayList<TermSource> sources;
+  private List<TermSource> sources;
   private PriorityQueue<TermSource> sorted = new PriorityQueue<>(5);
-  private static final Text nullText = new Text();
-  private static final Key nullKey = new Key();
 
   protected static class TermSource implements Comparable<TermSource> {
-    public SortedKeyValueIterator<Key,Value> iter;
-    public Text term;
-    public Collection<ByteSequence> seekColfams;
+    private final SortedKeyValueIterator<Key,Value> iter;
+    private final Text term;
+    private final Collection<ByteSequence> seekColfams;
+    private Range currentRange;
 
     public TermSource(TermSource other) {
-      this.iter = other.iter;
-      this.term = other.term;
-      this.seekColfams = other.seekColfams;
+      this.iter = Objects.requireNonNull(other.iter);
+      this.term = Objects.requireNonNull(other.term);
+      this.seekColfams = Objects.requireNonNull(other.seekColfams);
+      this.currentRange = Objects.requireNonNull(other.currentRange);
     }
 
     public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
-      this.iter = iter;
-      this.term = term;
+      this.iter = Objects.requireNonNull(iter);
+      this.term = Objects.requireNonNull(term);
       // The desired column families for this source is the term itself
       this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
+      // No current range until we're seek()'ed for the first time
+      this.currentRange = null;
     }
 
     @Override
@@ -69,7 +114,7 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> {
 
     @Override
     public boolean equals(Object obj) {
-      return obj == this || (obj != null && obj instanceof TermSource && 0 == compareTo((TermSource) obj));
+      return obj == this || (obj instanceof TermSource && 0 == compareTo((TermSource) obj));
     }
 
     @Override
@@ -80,17 +125,54 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> {
       // sorted after they have been determined to be valid.
       return this.iter.getTopKey().compareColumnQualifier(o.iter.getTopKey().getColumnQualifier());
     }
+
+    /**
+     * Converts the given {@code Range} into the correct {@code Range} for this TermSource (per this expected table structure) and then seeks this TermSource's
+     * SKVI.
+     */
+    public void seek(Range originalRange) throws IOException {
+      // the infinite start key is equivalent to a null startKey on the Range.
+      if (!originalRange.isInfiniteStartKey()) {
+        Key originalStartKey = originalRange.getStartKey();
+        // Pivot the provided range into the range for this term
+        Key newKey = new Key(originalStartKey.getRow(), term, originalStartKey.getColumnQualifier(), originalStartKey.getTimestamp());
+        // Construct the new range, preserving the other attributes on the provided range.
+        currentRange = new Range(newKey, originalRange.isStartKeyInclusive(), originalRange.getEndKey(), originalRange.isEndKeyInclusive());
+      } else {
+        currentRange = originalRange;
+      }
+      LOG.trace("Seeking {} to {}", this, currentRange);
+      iter.seek(currentRange, seekColfams, true);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("TermSource{term=").append(term).append(", currentRange=").append(currentRange).append("}");
+      return sb.toString();
+    }
+
+    /**
+     * @return True if there is a valid topKey which falls into the range this TermSource's iterator was last seeked to, false otherwise.
+     */
+    boolean hasEntryForTerm() {
+      if (!iter.hasTop()) {
+        return false;
+      }
+      return currentRange.contains(iter.getTopKey());
+    }
   }
 
   public OrIterator() {
-    this.sources = new ArrayList<>();
+    this.sources = Collections.emptyList();
   }
 
   private OrIterator(OrIterator other, IteratorEnvironment env) {
-    this.sources = new ArrayList<>();
+    ArrayList<TermSource> copiedSources = new ArrayList<>();
 
     for (TermSource TS : other.sources)
-      this.sources.add(new TermSource(TS.iter.deepCopy(env), TS.term));
+      copiedSources.add(new TermSource(TS.iter.deepCopy(env), new Text(TS.term)));
+    this.sources = Collections.unmodifiableList(copiedSources);
   }
 
   @Override
@@ -98,41 +180,48 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> {
     return new OrIterator(this, env);
   }
 
-  public void addTerm(SortedKeyValueIterator<Key,Value> source, Text term, IteratorEnvironment env) {
-    this.sources.add(new TermSource(source.deepCopy(env), term));
+  public void setTerms(SortedKeyValueIterator<Key,Value> source, Collection<String> terms, IteratorEnvironment env) {
+    ArrayList<TermSource> newTerms = new ArrayList<>();
+    for (String term : terms) {
+      newTerms.add(new TermSource(source.deepCopy(env), new Text(term)));
+    }
+    this.sources = Collections.unmodifiableList(newTerms);
   }
 
   @Override
   final public void next() throws IOException {
-
+    LOG.trace("next()");
     if (currentTerm == null)
       return;
 
     // Advance currentTerm
     currentTerm.iter.next();
 
-    // See if currentTerm is still valid, remove if not
-    if (!(currentTerm.iter.hasTop()) || ((currentTerm.term != null) && (currentTerm.term.compareTo(currentTerm.iter.getTopKey().getColumnFamily()) != 0)))
-      currentTerm = null;
+    // Avoid computing this multiple times
+    final boolean currentTermHasMoreEntries = currentTerm.hasEntryForTerm();
 
     // optimization.
     // if size == 0, currentTerm is the only item left,
     // OR there are no items left.
     // In either case, we don't need to use the PriorityQueue
-    if (sorted.size() > 0) {
-      // sort the term back in
-      if (currentTerm != null)
+    if (!sorted.isEmpty()) {
+      // Add the currentTerm back to the heap to let it sort it with the rest
+      if (currentTermHasMoreEntries) {
         sorted.add(currentTerm);
-      // and get the current top item out.
+      }
+      // Let the heap return the next value to inspect
       currentTerm = sorted.poll();
-    }
+    } else if (!currentTermHasMoreEntries) {
+      // This currentTerm source was our last TermSource and it ran out of results
+      currentTerm = null;
+    } // else, currentTerm is the last TermSource and it has more results
   }
 
   @Override
   public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-
+    LOG.trace("seek() range={}", range);
     // If sources.size is 0, there is nothing to process, so just return.
-    if (sources.size() == 0) {
+    if (sources.isEmpty()) {
       currentTerm = null;
       return;
     }
@@ -141,32 +230,13 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> {
     // Yes, this is lots of duplicate code, but the speed works...
     // and we don't have a priority queue of size 0 or 1.
     if (sources.size() == 1) {
+      currentTerm = sources.get(0);
+      currentTerm.seek(range);
 
-      if (currentTerm == null)
-        currentTerm = sources.get(0);
-      Range newRange = null;
-
-      if (range != null) {
-        if ((range.getStartKey() == null) || (range.getStartKey().getRow() == null))
-          newRange = range;
-        else {
-          Key newKey = null;
-          if (range.getStartKey().getColumnQualifier() == null)
-            newKey = new Key(range.getStartKey().getRow(), (currentTerm.term == null) ? nullText : currentTerm.term);
-          else
-            newKey = new Key(range.getStartKey().getRow(), (currentTerm.term == null) ? nullText : currentTerm.term, range.getStartKey().getColumnQualifier());
-          newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false);
-        }
-      }
-      currentTerm.iter.seek(newRange, currentTerm.seekColfams, true);
-
-      // If there is no top key
-      // OR we are:
-      // 1) NOT an iterator
-      // 2) we have seeked into the next term (ie: seek man, get man001)
-      // then ignore it as a valid source
-      if (!(currentTerm.iter.hasTop()) || ((currentTerm.term != null) && (currentTerm.term.compareTo(currentTerm.iter.getTopKey().getColumnFamily()) != 0)))
+      if (!currentTerm.hasEntryForTerm()) {
+        // Signifies that there are no possible results for this range.
         currentTerm = null;
+      }
 
       // Otherwise, source is valid.
       return;
@@ -175,78 +245,69 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> {
     // Clear the PriorityQueue so that we can re-populate it.
     sorted.clear();
 
-    // This check is put in here to guard against the "initial seek"
-    // crashing us because the topkey term does not match.
-    // Note: It is safe to do the "sources.size() == 1" above
-    // because an Or must have at least two elements.
-    if (currentTerm == null) {
-      for (TermSource TS : sources) {
-        TS.iter.seek(range, TS.seekColfams, true);
-
-        if ((TS.iter.hasTop()) && ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) == 0)))
-          sorted.add(TS);
-      }
-      currentTerm = sorted.poll();
-      return;
-    }
-
-    TermSource TS = null;
     Iterator<TermSource> iter = sources.iterator();
     // For each term, seek forward.
     // if a hit is not found, delete it from future searches.
     while (iter.hasNext()) {
-      TS = iter.next();
-      Range newRange = null;
-
-      if (range != null) {
-        if ((range.getStartKey() == null) || (range.getStartKey().getRow() == null))
-          newRange = range;
-        else {
-          Key newKey = null;
-          if (range.getStartKey().getColumnQualifier() == null)
-            newKey = new Key(range.getStartKey().getRow(), (TS.term == null) ? nullText : TS.term);
-          else
-            newKey = new Key(range.getStartKey().getRow(), (TS.term == null) ? nullText : TS.term, range.getStartKey().getColumnQualifier());
-          newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false);
-        }
+      TermSource ts = iter.next();
+      // Pivot the provided range into the correct range for this TermSource and seek the TS.
+      ts.seek(range);
+
+      if (ts.hasEntryForTerm()) {
+        LOG.trace("Retaining TermSource for {}", ts);
+        // Otherwise, source is valid. Add it to the sources.
+        sorted.add(ts);
+      } else {
+        LOG.trace("Not adding TermSource to heap for {}", ts);
       }
-
-      // Seek only to the term for this source as a column family
-      TS.iter.seek(newRange, TS.seekColfams, true);
-
-      // If there is no top key
-      // OR we are:
-      // 1) NOT an iterator
-      // 2) we have seeked into the next term (ie: seek man, get man001)
-      // then ignore it as a valid source
-      if (!(TS.iter.hasTop()) || ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) != 0)))
-        iter.remove();
-
-      // Otherwise, source is valid. Add it to the sources.
-      sorted.add(TS);
     }
 
     // And set currentTerm = the next valid key/term.
+    // If the heap is empty, it returns null which signals iteration to cease
     currentTerm = sorted.poll();
   }
 
   @Override
   final public Key getTopKey() {
-    return currentTerm.iter.getTopKey();
+    final Key k = currentTerm.iter.getTopKey();
+    LOG.trace("getTopKey() = {}", k);
+    return k;
   }
 
   @Override
   final public Value getTopValue() {
-    return currentTerm.iter.getTopValue();
+    final Value v = currentTerm.iter.getTopValue();
+    LOG.trace("getTopValue() = {}", v);
+    return v;
   }
 
   @Override
   final public boolean hasTop() {
+    LOG.trace("hasTop()");
     return currentTerm != null;
   }
 
   @Override
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
-    throw new UnsupportedOperationException();
+    LOG.trace("init()");
+    String columnsValue = options.get(COLUMNS_KEY);
+    if (null == columnsValue) {
+      throw new IllegalArgumentException(COLUMNS_KEY + " was not provided in the iterator configuration");
+    }
+    String[] columns = StringUtils.split(columnsValue, ',');
+    setTerms(source, Arrays.asList(columns), env);
+    LOG.trace("Set sources: {}", this.sources);
+  }
+
+  @Override
+  public IteratorOptions describeOptions() {
+    Map<String,String> options = new HashMap<>();
+    options.put(COLUMNS_KEY, "A comma-separated list of families");
+    return new IteratorOptions("OrIterator", "Produces a sorted stream of qualifiers based on families", options, Collections.<String> emptyList());
+  }
+
+  @Override
+  public boolean validateOptions(Map<String,String> options) {
+    return null != options.get(COLUMNS_KEY);
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5ac15742/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java b/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java
new file mode 100644
index 0000000..d0fb12c
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.OrIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class OrIteratorIT extends AccumuloClusterIT {
+  private static final String EMPTY = "";
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void testMultipleRowsInTablet() throws Exception {
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    BatchWriter bw = null;
+    try {
+      bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+      Mutation m = new Mutation("row1");
+      m.put("bob", "2", EMPTY);
+      m.put("frank", "3", EMPTY);
+      m.put("steve", "1", EMPTY);
+      bw.addMutation(m);
+
+      m = new Mutation("row2");
+      m.put("bob", "7", EMPTY);
+      m.put("eddie", "4", EMPTY);
+      m.put("mort", "6", EMPTY);
+      m.put("zed", "5", EMPTY);
+      bw.addMutation(m);
+    } finally {
+      if (null != bw) {
+        bw.close();
+      }
+    }
+
+    IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+    is.addOption(OrIterator.COLUMNS_KEY, "mort,frank");
+    Map<String,String> expectedData = new HashMap<>();
+    expectedData.put("frank", "3");
+    expectedData.put("mort", "6");
+
+    BatchScanner bs = null;
+    try {
+      bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
+      Set<Range> ranges = new HashSet<>(Arrays.asList(Range.exact("row1"), Range.exact("row2")));
+      bs.setRanges(ranges);
+      bs.addScanIterator(is);
+      for (Entry<Key,Value> entry : bs) {
+        String term = entry.getKey().getColumnFamily().toString();
+        String expectedDocId = expectedData.remove(term);
+        assertNotNull("Found unexpected term: " + term, expectedDocId);
+        assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString());
+      }
+      assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty());
+    } finally {
+      if (null != bs) {
+        bs.close();
+      }
+    }
+  }
+
+  @Test
+  public void testMultipleTablets() throws Exception {
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    BatchWriter bw = null;
+    try {
+      bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+      Mutation m = new Mutation("row1");
+      m.put("bob", "2", EMPTY);
+      m.put("frank", "3", EMPTY);
+      m.put("steve", "1", EMPTY);
+      bw.addMutation(m);
+
+      m = new Mutation("row2");
+      m.put("bob", "7", EMPTY);
+      m.put("eddie", "4", EMPTY);
+      m.put("mort", "6", EMPTY);
+      m.put("zed", "5", EMPTY);
+      bw.addMutation(m);
+
+      m = new Mutation("row3");
+      m.put("carl", "9", EMPTY);
+      m.put("george", "8", EMPTY);
+      m.put("nick", "3", EMPTY);
+      m.put("zed", "1", EMPTY);
+      bw.addMutation(m);
+    } finally {
+      if (null != bw) {
+        bw.close();
+      }
+    }
+
+    conn.tableOperations().addSplits(tableName, new TreeSet<>(Arrays.asList(new Text("row2"), new Text("row3"))));
+
+    IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+    is.addOption(OrIterator.COLUMNS_KEY, "mort,frank,nick");
+    Map<String,String> expectedData = new HashMap<>();
+    expectedData.put("frank", "3");
+    expectedData.put("mort", "6");
+    expectedData.put("nick", "3");
+
+    BatchScanner bs = null;
+    try {
+      bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
+      bs.setRanges(Collections.singleton(new Range()));
+      bs.addScanIterator(is);
+      for (Entry<Key,Value> entry : bs) {
+        String term = entry.getKey().getColumnFamily().toString();
+        String expectedDocId = expectedData.remove(term);
+        assertNotNull("Found unexpected term: " + term, expectedDocId);
+        assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString());
+      }
+      assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty());
+    } finally {
+      if (null != bs) {
+        bs.close();
+      }
+    }
+  }
+
+  @Test
+  public void testSingleLargeRow() throws Exception {
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+    conn.tableOperations().setProperty(tableName, Property.TABLE_SCAN_MAXMEM.getKey(), "1");
+
+    BatchWriter bw = null;
+    try {
+      bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+      Mutation m = new Mutation("row1");
+      m.put("bob", "02", EMPTY);
+      m.put("carl", "07", EMPTY);
+      m.put("eddie", "04", EMPTY);
+      m.put("frank", "03", EMPTY);
+      m.put("greg", "15", EMPTY);
+      m.put("mort", "06", EMPTY);
+      m.put("nick", "12", EMPTY);
+      m.put("richard", "18", EMPTY);
+      m.put("steve", "01", EMPTY);
+      m.put("ted", "11", EMPTY);
+      m.put("zed", "05", EMPTY);
+      bw.addMutation(m);
+    } finally {
+      if (null != bw) {
+        bw.close();
+      }
+    }
+
+    IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+    is.addOption(OrIterator.COLUMNS_KEY, "richard,carl,frank,nick,eddie,zed");
+    Map<String,String> expectedData = new HashMap<>();
+    expectedData.put("frank", "03");
+    expectedData.put("eddie", "04");
+    expectedData.put("zed", "05");
+    expectedData.put("carl", "07");
+    expectedData.put("nick", "12");
+    expectedData.put("richard", "18");
+
+    BatchScanner bs = null;
+    try {
+      bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
+      bs.setRanges(Collections.singleton(new Range()));
+      bs.addScanIterator(is);
+      for (Entry<Key,Value> entry : bs) {
+        String term = entry.getKey().getColumnFamily().toString();
+        String expectedDocId = expectedData.remove(term);
+        assertNotNull("Found unexpected term: " + term + " or the docId was unexpectedly null", expectedDocId);
+        assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString());
+      }
+      assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty());
+    } finally {
+      if (null != bs) {
+        bs.close();
+      }
+    }
+  }
+
+  @Test
+  public void testNoMatchesForTable() throws Exception {
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    BatchWriter bw = null;
+    try {
+      bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+      Mutation m = new Mutation("row1");
+      m.put("bob", "02", EMPTY);
+      m.put("carl", "07", EMPTY);
+      m.put("eddie", "04", EMPTY);
+      m.put("frank", "03", EMPTY);
+      m.put("greg", "15", EMPTY);
+      m.put("mort", "06", EMPTY);
+      m.put("nick", "12", EMPTY);
+      m.put("richard", "18", EMPTY);
+      m.put("steve", "01", EMPTY);
+      m.put("ted", "11", EMPTY);
+      m.put("zed", "05", EMPTY);
+      bw.addMutation(m);
+    } finally {
+      if (null != bw) {
+        bw.close();
+      }
+    }
+
+    IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+    is.addOption(OrIterator.COLUMNS_KEY, "theresa,sally");
+    Map<String,String> expectedData = Collections.emptyMap();
+
+    BatchScanner bs = null;
+    try {
+      bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
+      bs.setRanges(Collections.singleton(new Range()));
+      bs.addScanIterator(is);
+      for (Entry<Key,Value> entry : bs) {
+        String term = entry.getKey().getColumnFamily().toString();
+        String expectedDocId = expectedData.remove(term);
+        assertNotNull("Found unexpected term: " + term + " or the docId was unexpectedly null", expectedDocId);
+        assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString());
+      }
+      assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty());
+    } finally {
+      if (null != bs) {
+        bs.close();
+      }
+    }
+  }
+
+  @Test
+  public void testNoMatchesInSingleTablet() throws Exception {
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    BatchWriter bw = null;
+    try {
+      bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+      Mutation m = new Mutation("row1");
+      m.put("bob", "02", EMPTY);
+      m.put("carl", "07", EMPTY);
+      m.put("eddie", "04", EMPTY);
+      bw.addMutation(m);
+
+      m = new Mutation("row2");
+      m.put("frank", "03", EMPTY);
+      m.put("greg", "15", EMPTY);
+      m.put("mort", "06", EMPTY);
+      m.put("nick", "12", EMPTY);
+      bw.addMutation(m);
+
+      m = new Mutation("row3");
+      m.put("richard", "18", EMPTY);
+      m.put("steve", "01", EMPTY);
+      m.put("ted", "11", EMPTY);
+      m.put("zed", "05", EMPTY);
+      bw.addMutation(m);
+    } finally {
+      if (null != bw) {
+        bw.close();
+      }
+    }
+
+    IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+    is.addOption(OrIterator.COLUMNS_KEY, "bob,eddie,steve,zed");
+    Map<String,String> expectedData = new HashMap<>();
+    expectedData.put("bob", "02");
+    expectedData.put("eddie", "04");
+    expectedData.put("zed", "05");
+    expectedData.put("steve", "01");
+
+    // Split each row into its own tablet
+    conn.tableOperations().addSplits(tableName, new TreeSet<>(Arrays.asList(new Text("row2"), new Text("row3"))));
+
+    BatchScanner bs = null;
+    try {
+      bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
+      bs.setRanges(Collections.singleton(new Range()));
+      bs.addScanIterator(is);
+      for (Entry<Key,Value> entry : bs) {
+        String term = entry.getKey().getColumnFamily().toString();
+        String expectedDocId = expectedData.remove(term);
+        assertNotNull("Found unexpected term: " + term + " or the docId was unexpectedly null", expectedDocId);
+        assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString());
+      }
+      assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty());
+    } finally {
+      if (null != bs) {
+        bs.close();
+      }
+    }
+  }
+
+  @Test
+  public void testResultOrder() throws Exception {
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    BatchWriter bw = null;
+    try {
+      bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+      Mutation m = new Mutation("row1");
+      m.put("bob", "2", EMPTY);
+      m.put("frank", "3", EMPTY);
+      m.put("steve", "1", EMPTY);
+      bw.addMutation(m);
+    } finally {
+      if (null != bw) {
+        bw.close();
+      }
+    }
+
+    IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+    is.addOption(OrIterator.COLUMNS_KEY, "bob,steve");
+
+    Scanner s = null;
+    try {
+      s = conn.createScanner(tableName, Authorizations.EMPTY);
+      s.addScanIterator(is);
+      Iterator<Entry<Key,Value>> iter = s.iterator();
+      assertTrue(iter.hasNext());
+      Key k = iter.next().getKey();
+      assertEquals("Actual key was " + k, 0, k.compareTo(new Key("row1", "steve", "1"), PartialKey.ROW_COLFAM_COLQUAL));
+      assertTrue(iter.hasNext());
+      k = iter.next().getKey();
+      assertEquals("Actual key was " + k, 0, k.compareTo(new Key("row1", "bob", "2"), PartialKey.ROW_COLFAM_COLQUAL));
+      assertFalse(iter.hasNext());
+    } finally {
+      if (null != s) {
+        s.close();
+      }
+    }
+  }
+}