You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2017/04/13 18:13:41 UTC
[2/6] accumulo git commit: ACCUMULO-3208 Integration test for the
OrIterator and cleanup
ACCUMULO-3208 Integration test for the OrIterator and cleanup
The OrIterator was in very bad shape, with next-to-no documentation
about what it actually does.
Closes apache/accumulo#247
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5ac15742
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5ac15742
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5ac15742
Branch: refs/heads/1.8
Commit: 5ac1574243f4445399dbc76da9392fb393f7f69e
Parents: 8c0f03a
Author: Josh Elser <el...@apache.org>
Authored: Sun Apr 9 22:45:56 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Apr 13 12:42:38 2017 -0400
----------------------------------------------------------------------
.../accumulo/core/iterators/OrIterator.java | 263 ++++++++-----
.../org/apache/accumulo/test/OrIteratorIT.java | 389 +++++++++++++++++++
2 files changed, 551 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5ac15742/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
index 43ed5ed..c75bd54 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
@@ -18,9 +18,12 @@ package org.apache.accumulo.core.iterators;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
@@ -30,36 +33,78 @@ import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * An iterator that handles "OR" query constructs on the server side. This code has been adapted/merged from Heap and Multi Iterators.
+ * An iterator that provides a sorted-iteration of column qualifiers for a set of column families in a row. It is important to note that this iterator
+ * <em>does not</em> adhere to the contract set forth by the {@link SortedKeyValueIterator}. It returns Keys in {@code row+colqual} order instead of
+ * {@code row+colfam+colqual} order. This is required for the implementation of this iterator (to work in conjunction with the {@code IntersectingIterator}) but
+ * is a code-smell. This iterator should only be used at query time, never at compaction time.
+ *
+ * The table structure should have the following form:
+ *
+ * <pre>
+ * row term:docId => value
+ * </pre>
+ *
+ * Users configuring this iterator must set the option {@link #COLUMNS_KEY}. This value is a comma-separated list of column families that should be "OR"'ed
+ * together.
+ *
+ * For example, given the following data and a value of {@code or.iterator.columns="steve,bob"} in the iterator options map:
+ *
+ * <pre>
+ * row1 bob:4
+ * row1 george:2
+ * row1 steve:3
+ * row2 bob:9
+ * row2 frank:8
+ * row2 steve:12
+ * row3 michael:15
+ * row3 steve:20
+ * </pre>
+ *
+ * Would return:
+ *
+ * <pre>
+ * row1 steve:3
+ * row1 bob:4
+ * row2 bob:9
+ * row2 steve:12
+ * row3 steve:20
+ * </pre>
*/
-public class OrIterator implements SortedKeyValueIterator<Key,Value> {
+public class OrIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+ private static final Logger LOG = LoggerFactory.getLogger(OrIterator.class);
+ public static final String COLUMNS_KEY = "or.iterator.columns";
private TermSource currentTerm;
- private ArrayList<TermSource> sources;
+ private List<TermSource> sources;
private PriorityQueue<TermSource> sorted = new PriorityQueue<>(5);
- private static final Text nullText = new Text();
- private static final Key nullKey = new Key();
protected static class TermSource implements Comparable<TermSource> {
- public SortedKeyValueIterator<Key,Value> iter;
- public Text term;
- public Collection<ByteSequence> seekColfams;
+ private final SortedKeyValueIterator<Key,Value> iter;
+ private final Text term;
+ private final Collection<ByteSequence> seekColfams;
+ private Range currentRange;
public TermSource(TermSource other) {
- this.iter = other.iter;
- this.term = other.term;
- this.seekColfams = other.seekColfams;
+ this.iter = Objects.requireNonNull(other.iter);
+ this.term = Objects.requireNonNull(other.term);
+ this.seekColfams = Objects.requireNonNull(other.seekColfams);
+ this.currentRange = Objects.requireNonNull(other.currentRange);
}
public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
- this.iter = iter;
- this.term = term;
+ this.iter = Objects.requireNonNull(iter);
+ this.term = Objects.requireNonNull(term);
// The desired column families for this source is the term itself
this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
+ // No current range until we're seek()'ed for the first time
+ this.currentRange = null;
}
@Override
@@ -69,7 +114,7 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> {
@Override
public boolean equals(Object obj) {
- return obj == this || (obj != null && obj instanceof TermSource && 0 == compareTo((TermSource) obj));
+ return obj == this || (obj instanceof TermSource && 0 == compareTo((TermSource) obj));
}
@Override
@@ -80,17 +125,54 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> {
// sorted after they have been determined to be valid.
return this.iter.getTopKey().compareColumnQualifier(o.iter.getTopKey().getColumnQualifier());
}
+
+ /**
+ * Converts the given {@code Range} into the correct {@code Range} for this TermSource (per this expected table structure) and then seeks this TermSource's
+ * SKVI.
+ */
+ public void seek(Range originalRange) throws IOException {
+ // the infinite start key is equivalent to a null startKey on the Range.
+ if (!originalRange.isInfiniteStartKey()) {
+ Key originalStartKey = originalRange.getStartKey();
+ // Pivot the provided range into the range for this term
+ Key newKey = new Key(originalStartKey.getRow(), term, originalStartKey.getColumnQualifier(), originalStartKey.getTimestamp());
+ // Construct the new range, preserving the other attributes on the provided range.
+ currentRange = new Range(newKey, originalRange.isStartKeyInclusive(), originalRange.getEndKey(), originalRange.isEndKeyInclusive());
+ } else {
+ currentRange = originalRange;
+ }
+ LOG.trace("Seeking {} to {}", this, currentRange);
+ iter.seek(currentRange, seekColfams, true);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("TermSource{term=").append(term).append(", currentRange=").append(currentRange).append("}");
+ return sb.toString();
+ }
+
+ /**
+ * @return True if there is a valid topKey which falls into the range this TermSource's iterator was last seeked to, false otherwise.
+ */
+ boolean hasEntryForTerm() {
+ if (!iter.hasTop()) {
+ return false;
+ }
+ return currentRange.contains(iter.getTopKey());
+ }
}
public OrIterator() {
- this.sources = new ArrayList<>();
+ this.sources = Collections.emptyList();
}
private OrIterator(OrIterator other, IteratorEnvironment env) {
- this.sources = new ArrayList<>();
+ ArrayList<TermSource> copiedSources = new ArrayList<>();
for (TermSource TS : other.sources)
- this.sources.add(new TermSource(TS.iter.deepCopy(env), TS.term));
+ copiedSources.add(new TermSource(TS.iter.deepCopy(env), new Text(TS.term)));
+ this.sources = Collections.unmodifiableList(copiedSources);
}
@Override
@@ -98,41 +180,48 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> {
return new OrIterator(this, env);
}
- public void addTerm(SortedKeyValueIterator<Key,Value> source, Text term, IteratorEnvironment env) {
- this.sources.add(new TermSource(source.deepCopy(env), term));
+ public void setTerms(SortedKeyValueIterator<Key,Value> source, Collection<String> terms, IteratorEnvironment env) {
+ ArrayList<TermSource> newTerms = new ArrayList<>();
+ for (String term : terms) {
+ newTerms.add(new TermSource(source.deepCopy(env), new Text(term)));
+ }
+ this.sources = Collections.unmodifiableList(newTerms);
}
@Override
final public void next() throws IOException {
-
+ LOG.trace("next()");
if (currentTerm == null)
return;
// Advance currentTerm
currentTerm.iter.next();
- // See if currentTerm is still valid, remove if not
- if (!(currentTerm.iter.hasTop()) || ((currentTerm.term != null) && (currentTerm.term.compareTo(currentTerm.iter.getTopKey().getColumnFamily()) != 0)))
- currentTerm = null;
+ // Avoid computing this multiple times
+ final boolean currentTermHasMoreEntries = currentTerm.hasEntryForTerm();
// optimization.
// if size == 0, currentTerm is the only item left,
// OR there are no items left.
// In either case, we don't need to use the PriorityQueue
- if (sorted.size() > 0) {
- // sort the term back in
- if (currentTerm != null)
+ if (!sorted.isEmpty()) {
+ // Add the currentTerm back to the heap to let it sort it with the rest
+ if (currentTermHasMoreEntries) {
sorted.add(currentTerm);
- // and get the current top item out.
+ }
+ // Let the heap return the next value to inspect
currentTerm = sorted.poll();
- }
+ } else if (!currentTermHasMoreEntries) {
+ // This currentTerm source was our last TermSource and it ran out of results
+ currentTerm = null;
+ } // else, currentTerm is the last TermSource and it has more results
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-
+ LOG.trace("seek() range={}", range);
// If sources.size is 0, there is nothing to process, so just return.
- if (sources.size() == 0) {
+ if (sources.isEmpty()) {
currentTerm = null;
return;
}
@@ -141,32 +230,13 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> {
// Yes, this is lots of duplicate code, but the speed works...
// and we don't have a priority queue of size 0 or 1.
if (sources.size() == 1) {
+ currentTerm = sources.get(0);
+ currentTerm.seek(range);
- if (currentTerm == null)
- currentTerm = sources.get(0);
- Range newRange = null;
-
- if (range != null) {
- if ((range.getStartKey() == null) || (range.getStartKey().getRow() == null))
- newRange = range;
- else {
- Key newKey = null;
- if (range.getStartKey().getColumnQualifier() == null)
- newKey = new Key(range.getStartKey().getRow(), (currentTerm.term == null) ? nullText : currentTerm.term);
- else
- newKey = new Key(range.getStartKey().getRow(), (currentTerm.term == null) ? nullText : currentTerm.term, range.getStartKey().getColumnQualifier());
- newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false);
- }
- }
- currentTerm.iter.seek(newRange, currentTerm.seekColfams, true);
-
- // If there is no top key
- // OR we are:
- // 1) NOT an iterator
- // 2) we have seeked into the next term (ie: seek man, get man001)
- // then ignore it as a valid source
- if (!(currentTerm.iter.hasTop()) || ((currentTerm.term != null) && (currentTerm.term.compareTo(currentTerm.iter.getTopKey().getColumnFamily()) != 0)))
+ if (!currentTerm.hasEntryForTerm()) {
+ // Signifies that there are no possible results for this range.
currentTerm = null;
+ }
// Otherwise, source is valid.
return;
@@ -175,78 +245,69 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> {
// Clear the PriorityQueue so that we can re-populate it.
sorted.clear();
- // This check is put in here to guard against the "initial seek"
- // crashing us because the topkey term does not match.
- // Note: It is safe to do the "sources.size() == 1" above
- // because an Or must have at least two elements.
- if (currentTerm == null) {
- for (TermSource TS : sources) {
- TS.iter.seek(range, TS.seekColfams, true);
-
- if ((TS.iter.hasTop()) && ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) == 0)))
- sorted.add(TS);
- }
- currentTerm = sorted.poll();
- return;
- }
-
- TermSource TS = null;
Iterator<TermSource> iter = sources.iterator();
// For each term, seek forward.
// if a hit is not found, delete it from future searches.
while (iter.hasNext()) {
- TS = iter.next();
- Range newRange = null;
-
- if (range != null) {
- if ((range.getStartKey() == null) || (range.getStartKey().getRow() == null))
- newRange = range;
- else {
- Key newKey = null;
- if (range.getStartKey().getColumnQualifier() == null)
- newKey = new Key(range.getStartKey().getRow(), (TS.term == null) ? nullText : TS.term);
- else
- newKey = new Key(range.getStartKey().getRow(), (TS.term == null) ? nullText : TS.term, range.getStartKey().getColumnQualifier());
- newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false);
- }
+ TermSource ts = iter.next();
+ // Pivot the provided range into the correct range for this TermSource and seek the TS.
+ ts.seek(range);
+
+ if (ts.hasEntryForTerm()) {
+ LOG.trace("Retaining TermSource for {}", ts);
+ // Otherwise, source is valid. Add it to the sources.
+ sorted.add(ts);
+ } else {
+ LOG.trace("Not adding TermSource to heap for {}", ts);
}
-
- // Seek only to the term for this source as a column family
- TS.iter.seek(newRange, TS.seekColfams, true);
-
- // If there is no top key
- // OR we are:
- // 1) NOT an iterator
- // 2) we have seeked into the next term (ie: seek man, get man001)
- // then ignore it as a valid source
- if (!(TS.iter.hasTop()) || ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) != 0)))
- iter.remove();
-
- // Otherwise, source is valid. Add it to the sources.
- sorted.add(TS);
}
// And set currentTerm = the next valid key/term.
+ // If the heap is empty, it returns null which signals iteration to cease
currentTerm = sorted.poll();
}
@Override
final public Key getTopKey() {
- return currentTerm.iter.getTopKey();
+ final Key k = currentTerm.iter.getTopKey();
+ LOG.trace("getTopKey() = {}", k);
+ return k;
}
@Override
final public Value getTopValue() {
- return currentTerm.iter.getTopValue();
+ final Value v = currentTerm.iter.getTopValue();
+ LOG.trace("getTopValue() = {}", v);
+ return v;
}
@Override
final public boolean hasTop() {
+ LOG.trace("hasTop()");
return currentTerm != null;
}
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- throw new UnsupportedOperationException();
+ LOG.trace("init()");
+ String columnsValue = options.get(COLUMNS_KEY);
+ if (null == columnsValue) {
+ throw new IllegalArgumentException(COLUMNS_KEY + " was not provided in the iterator configuration");
+ }
+ String[] columns = StringUtils.split(columnsValue, ',');
+ setTerms(source, Arrays.asList(columns), env);
+ LOG.trace("Set sources: {}", this.sources);
+ }
+
+ @Override
+ public IteratorOptions describeOptions() {
+ Map<String,String> options = new HashMap<>();
+ options.put(COLUMNS_KEY, "A comma-separated list of families");
+ return new IteratorOptions("OrIterator", "Produces a sorted stream of qualifiers based on families", options, Collections.<String> emptyList());
+ }
+
+ @Override
+ public boolean validateOptions(Map<String,String> options) {
+ return null != options.get(COLUMNS_KEY);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5ac15742/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java b/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java
new file mode 100644
index 0000000..d0fb12c
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.OrIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class OrIteratorIT extends AccumuloClusterIT {
+ private static final String EMPTY = "";
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void testMultipleRowsInTablet() throws Exception {
+ final Connector conn = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+
+ BatchWriter bw = null;
+ try {
+ bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row1");
+ m.put("bob", "2", EMPTY);
+ m.put("frank", "3", EMPTY);
+ m.put("steve", "1", EMPTY);
+ bw.addMutation(m);
+
+ m = new Mutation("row2");
+ m.put("bob", "7", EMPTY);
+ m.put("eddie", "4", EMPTY);
+ m.put("mort", "6", EMPTY);
+ m.put("zed", "5", EMPTY);
+ bw.addMutation(m);
+ } finally {
+ if (null != bw) {
+ bw.close();
+ }
+ }
+
+ IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+ is.addOption(OrIterator.COLUMNS_KEY, "mort,frank");
+ Map<String,String> expectedData = new HashMap<>();
+ expectedData.put("frank", "3");
+ expectedData.put("mort", "6");
+
+ BatchScanner bs = null;
+ try {
+ bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
+ Set<Range> ranges = new HashSet<>(Arrays.asList(Range.exact("row1"), Range.exact("row2")));
+ bs.setRanges(ranges);
+ bs.addScanIterator(is);
+ for (Entry<Key,Value> entry : bs) {
+ String term = entry.getKey().getColumnFamily().toString();
+ String expectedDocId = expectedData.remove(term);
+ assertNotNull("Found unexpected term: " + term, expectedDocId);
+ assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString());
+ }
+ assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty());
+ } finally {
+ if (null != bs) {
+ bs.close();
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleTablets() throws Exception {
+ final Connector conn = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+
+ BatchWriter bw = null;
+ try {
+ bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row1");
+ m.put("bob", "2", EMPTY);
+ m.put("frank", "3", EMPTY);
+ m.put("steve", "1", EMPTY);
+ bw.addMutation(m);
+
+ m = new Mutation("row2");
+ m.put("bob", "7", EMPTY);
+ m.put("eddie", "4", EMPTY);
+ m.put("mort", "6", EMPTY);
+ m.put("zed", "5", EMPTY);
+ bw.addMutation(m);
+
+ m = new Mutation("row3");
+ m.put("carl", "9", EMPTY);
+ m.put("george", "8", EMPTY);
+ m.put("nick", "3", EMPTY);
+ m.put("zed", "1", EMPTY);
+ bw.addMutation(m);
+ } finally {
+ if (null != bw) {
+ bw.close();
+ }
+ }
+
+ conn.tableOperations().addSplits(tableName, new TreeSet<>(Arrays.asList(new Text("row2"), new Text("row3"))));
+
+ IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+ is.addOption(OrIterator.COLUMNS_KEY, "mort,frank,nick");
+ Map<String,String> expectedData = new HashMap<>();
+ expectedData.put("frank", "3");
+ expectedData.put("mort", "6");
+ expectedData.put("nick", "3");
+
+ BatchScanner bs = null;
+ try {
+ bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
+ bs.setRanges(Collections.singleton(new Range()));
+ bs.addScanIterator(is);
+ for (Entry<Key,Value> entry : bs) {
+ String term = entry.getKey().getColumnFamily().toString();
+ String expectedDocId = expectedData.remove(term);
+ assertNotNull("Found unexpected term: " + term, expectedDocId);
+ assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString());
+ }
+ assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty());
+ } finally {
+ if (null != bs) {
+ bs.close();
+ }
+ }
+ }
+
+ @Test
+ public void testSingleLargeRow() throws Exception {
+ final Connector conn = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+ conn.tableOperations().setProperty(tableName, Property.TABLE_SCAN_MAXMEM.getKey(), "1");
+
+ BatchWriter bw = null;
+ try {
+ bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row1");
+ m.put("bob", "02", EMPTY);
+ m.put("carl", "07", EMPTY);
+ m.put("eddie", "04", EMPTY);
+ m.put("frank", "03", EMPTY);
+ m.put("greg", "15", EMPTY);
+ m.put("mort", "06", EMPTY);
+ m.put("nick", "12", EMPTY);
+ m.put("richard", "18", EMPTY);
+ m.put("steve", "01", EMPTY);
+ m.put("ted", "11", EMPTY);
+ m.put("zed", "05", EMPTY);
+ bw.addMutation(m);
+ } finally {
+ if (null != bw) {
+ bw.close();
+ }
+ }
+
+ IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+ is.addOption(OrIterator.COLUMNS_KEY, "richard,carl,frank,nick,eddie,zed");
+ Map<String,String> expectedData = new HashMap<>();
+ expectedData.put("frank", "03");
+ expectedData.put("eddie", "04");
+ expectedData.put("zed", "05");
+ expectedData.put("carl", "07");
+ expectedData.put("nick", "12");
+ expectedData.put("richard", "18");
+
+ BatchScanner bs = null;
+ try {
+ bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
+ bs.setRanges(Collections.singleton(new Range()));
+ bs.addScanIterator(is);
+ for (Entry<Key,Value> entry : bs) {
+ String term = entry.getKey().getColumnFamily().toString();
+ String expectedDocId = expectedData.remove(term);
+ assertNotNull("Found unexpected term: " + term + " or the docId was unexpectedly null", expectedDocId);
+ assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString());
+ }
+ assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty());
+ } finally {
+ if (null != bs) {
+ bs.close();
+ }
+ }
+ }
+
+ @Test
+ public void testNoMatchesForTable() throws Exception {
+ final Connector conn = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+
+ BatchWriter bw = null;
+ try {
+ bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row1");
+ m.put("bob", "02", EMPTY);
+ m.put("carl", "07", EMPTY);
+ m.put("eddie", "04", EMPTY);
+ m.put("frank", "03", EMPTY);
+ m.put("greg", "15", EMPTY);
+ m.put("mort", "06", EMPTY);
+ m.put("nick", "12", EMPTY);
+ m.put("richard", "18", EMPTY);
+ m.put("steve", "01", EMPTY);
+ m.put("ted", "11", EMPTY);
+ m.put("zed", "05", EMPTY);
+ bw.addMutation(m);
+ } finally {
+ if (null != bw) {
+ bw.close();
+ }
+ }
+
+ IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+ is.addOption(OrIterator.COLUMNS_KEY, "theresa,sally");
+ Map<String,String> expectedData = Collections.emptyMap();
+
+ BatchScanner bs = null;
+ try {
+ bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
+ bs.setRanges(Collections.singleton(new Range()));
+ bs.addScanIterator(is);
+ for (Entry<Key,Value> entry : bs) {
+ String term = entry.getKey().getColumnFamily().toString();
+ String expectedDocId = expectedData.remove(term);
+ assertNotNull("Found unexpected term: " + term + " or the docId was unexpectedly null", expectedDocId);
+ assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString());
+ }
+ assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty());
+ } finally {
+ if (null != bs) {
+ bs.close();
+ }
+ }
+ }
+
+ @Test
+ public void testNoMatchesInSingleTablet() throws Exception {
+ final Connector conn = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+
+ BatchWriter bw = null;
+ try {
+ bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row1");
+ m.put("bob", "02", EMPTY);
+ m.put("carl", "07", EMPTY);
+ m.put("eddie", "04", EMPTY);
+ bw.addMutation(m);
+
+ m = new Mutation("row2");
+ m.put("frank", "03", EMPTY);
+ m.put("greg", "15", EMPTY);
+ m.put("mort", "06", EMPTY);
+ m.put("nick", "12", EMPTY);
+ bw.addMutation(m);
+
+ m = new Mutation("row3");
+ m.put("richard", "18", EMPTY);
+ m.put("steve", "01", EMPTY);
+ m.put("ted", "11", EMPTY);
+ m.put("zed", "05", EMPTY);
+ bw.addMutation(m);
+ } finally {
+ if (null != bw) {
+ bw.close();
+ }
+ }
+
+ IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+ is.addOption(OrIterator.COLUMNS_KEY, "bob,eddie,steve,zed");
+ Map<String,String> expectedData = new HashMap<>();
+ expectedData.put("bob", "02");
+ expectedData.put("eddie", "04");
+ expectedData.put("zed", "05");
+ expectedData.put("steve", "01");
+
+ // Split each row into its own tablet
+ conn.tableOperations().addSplits(tableName, new TreeSet<>(Arrays.asList(new Text("row2"), new Text("row3"))));
+
+ BatchScanner bs = null;
+ try {
+ bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
+ bs.setRanges(Collections.singleton(new Range()));
+ bs.addScanIterator(is);
+ for (Entry<Key,Value> entry : bs) {
+ String term = entry.getKey().getColumnFamily().toString();
+ String expectedDocId = expectedData.remove(term);
+ assertNotNull("Found unexpected term: " + term + " or the docId was unexpectedly null", expectedDocId);
+ assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString());
+ }
+ assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty());
+ } finally {
+ if (null != bs) {
+ bs.close();
+ }
+ }
+ }
+
+ @Test
+ public void testResultOrder() throws Exception {
+ final Connector conn = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+
+ BatchWriter bw = null;
+ try {
+ bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row1");
+ m.put("bob", "2", EMPTY);
+ m.put("frank", "3", EMPTY);
+ m.put("steve", "1", EMPTY);
+ bw.addMutation(m);
+ } finally {
+ if (null != bw) {
+ bw.close();
+ }
+ }
+
+ IteratorSetting is = new IteratorSetting(50, OrIterator.class);
+ is.addOption(OrIterator.COLUMNS_KEY, "bob,steve");
+
+ Scanner s = null;
+ try {
+ s = conn.createScanner(tableName, Authorizations.EMPTY);
+ s.addScanIterator(is);
+ Iterator<Entry<Key,Value>> iter = s.iterator();
+ assertTrue(iter.hasNext());
+ Key k = iter.next().getKey();
+ assertEquals("Actual key was " + k, 0, k.compareTo(new Key("row1", "steve", "1"), PartialKey.ROW_COLFAM_COLQUAL));
+ assertTrue(iter.hasNext());
+ k = iter.next().getKey();
+ assertEquals("Actual key was " + k, 0, k.compareTo(new Key("row1", "bob", "2"), PartialKey.ROW_COLFAM_COLQUAL));
+ assertFalse(iter.hasNext());
+ } finally {
+ if (null != s) {
+ s.close();
+ }
+ }
+ }
+}