You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/07/05 23:04:29 UTC
svn commit: r1357915 - in
/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators:
OrIterator.java SortedKeyValueIterator.java user/IndexedDocIterator.java
user/IntersectingIterator.java
Author: ecn
Date: Thu Jul 5 21:04:28 2012
New Revision: 1357915
URL: http://svn.apache.org/viewvc?rev=1357915&view=rev
Log:
ACCUMULO-665: patch from Josh Elser to properly pass column family information for intersecting iterators
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java Thu Jul 5 21:04:28 2012
@@ -19,10 +19,12 @@ package org.apache.accumulo.core.iterato
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
+import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
@@ -47,15 +49,19 @@ public class OrIterator implements Sorte
protected static class TermSource implements Comparable<TermSource> {
public SortedKeyValueIterator<Key,Value> iter;
public Text term;
+ public Collection<ByteSequence> seekColfams;
public TermSource(TermSource other) {
this.iter = other.iter;
this.term = other.term;
+ this.seekColfams = other.seekColfams;
}
public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
this.iter = iter;
this.term = term;
+ // The desired column families for this source is the term itself
+ this.seekColfams = Collections.<ByteSequence>singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
}
public int compareTo(TermSource o) {
@@ -143,7 +149,7 @@ public class OrIterator implements Sorte
newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false);
}
}
- currentTerm.iter.seek(newRange, columnFamilies, inclusive);
+ currentTerm.iter.seek(newRange, currentTerm.seekColfams, true);
// If there is no top key
// OR we are:
@@ -166,7 +172,7 @@ public class OrIterator implements Sorte
// because an Or must have at least two elements.
if (currentTerm == null) {
for (TermSource TS : sources) {
- TS.iter.seek(range, columnFamilies, inclusive);
+ TS.iter.seek(range, TS.seekColfams, true);
if ((TS.iter.hasTop()) && ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) == 0)))
sorted.add(TS);
@@ -196,7 +202,8 @@ public class OrIterator implements Sorte
}
}
- TS.iter.seek(newRange, columnFamilies, inclusive);
+ // Seek only to the term for this source as a column family
+ TS.iter.seek(newRange, TS.seekColfams, true);
// If there is no top key
// OR we are:
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java Thu Jul 5 21:04:28 2012
@@ -81,6 +81,13 @@ public interface SortedKeyValueIterator<
* Iterators that examine groups of adjacent key/value pairs (e.g. rows) to determine their top key and value should be sure that they properly handle a seek
* to a key in the middle of such a group (e.g. the middle of a row). Even if the client always seeks to a range containing an entire group (a,c), the tablet
* server could send back a batch of entries corresponding to (a,b], then reseek the iterator to range (b,c) when the scan is continued.
+ *
+ * {@link columnFamilies} is used, at the lowest level, to determine which data blocks inside of an RFile need to be opened for this iterator. This set of data
+ * blocks is also the set of locality groups defined for the given table. If no columnFamilies are provided, the data blocks for all locality groups inside of
+ * the correct RFile will be opened and seeked in an attempt to find the correct start key, irregardless of the startKey in the {@link range}.
+ *
+ * In an Accumulo instance in which multiple locality groups exist for a table, it is important to ensure that {@link columnFamilies} is properly set to the
+ * minimum required column families to ensure that data from separate locality groups is not inadvertently read.
*
* @param range
* <tt>Range</tt> of keys to iterate over.
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java Thu Jul 5 21:04:28 2012
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Ran
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator.TermSource;
import org.apache.hadoop.io.Text;
/**
@@ -134,6 +135,10 @@ public class IndexedDocIterator extends
docColf = new Text(options.get(docFamilyOptionName));
docSource = source.deepCopy(env);
indexColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(indexColf.getBytes(), 0, indexColf.getLength()));
+
+ for (TermSource ts : this.sources) {
+ ts.seekColfams = indexColfSet;
+ }
}
@Override
@@ -143,7 +148,7 @@ public class IndexedDocIterator extends
@Override
public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
- super.seek(range, indexColfSet, true);
+ super.seek(range, null, true);
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java Thu Jul 5 21:04:28 2012
@@ -18,9 +18,11 @@ package org.apache.accumulo.core.iterato
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
@@ -48,6 +50,10 @@ import org.apache.log4j.Logger;
*
* This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
*
+ * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections
+ * over terms. Extending classes should override the {@link TermSource#seekColfams} in their implementation's
+ * {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method.
+ *
* README.shard in docs/examples shows an example of using the IntersectingIterator.
*/
public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> {
@@ -83,24 +89,26 @@ public class IntersectingIterator implem
protected static class TermSource {
public SortedKeyValueIterator<Key,Value> iter;
public Text term;
+ public Collection<ByteSequence> seekColfams;
public boolean notFlag;
public TermSource(TermSource other) {
this.iter = other.iter;
this.term = other.term;
this.notFlag = other.notFlag;
+ this.seekColfams = other.seekColfams;
}
public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
- this.iter = iter;
- this.term = term;
- this.notFlag = false;
+ this(iter, term, false);
}
public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) {
this.iter = iter;
this.term = term;
this.notFlag = notFlag;
+ // The desired column families for this source is the term itself
+ this.seekColfams = Collections.<ByteSequence>singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
}
public String getTermString() {
@@ -121,10 +129,6 @@ public class IntersectingIterator implem
protected Key topKey = null;
protected Value value = new Value(emptyByteArray);
- protected Collection<ByteSequence> seekColumnFamilies;
-
- protected boolean inclusive;
-
public IntersectingIterator() {}
@Override
@@ -196,7 +200,7 @@ public class IntersectingIterator implem
if (partitionCompare > 0) {
// seek to at least the currentRow
Key seekKey = buildKey(currentPartition, sources[sourceID].term);
- sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
continue;
}
// check if this source has gone beyond currentRow
@@ -213,7 +217,7 @@ public class IntersectingIterator implem
// if not, then seek forwards to the right columnFamily
if (termCompare > 0) {
Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
- sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
continue;
}
// check if this source is beyond the right columnFamily
@@ -235,7 +239,7 @@ public class IntersectingIterator implem
if (docIDCompare > 0) {
// seek forwards
Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
- sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
continue;
}
// if we are equal to the target, this is an invalid result.
@@ -273,7 +277,7 @@ public class IntersectingIterator implem
if (partitionCompare > 0) {
// seek to at least the currentRow
Key seekKey = buildKey(currentPartition, sources[sourceID].term);
- sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
continue;
}
// check if this source has gone beyond currentRow
@@ -294,7 +298,7 @@ public class IntersectingIterator implem
// if not, then seek forwards to the right columnFamily
if (termCompare > 0) {
Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
- sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
continue;
}
// check if this source is beyond the right columnFamily
@@ -314,7 +318,7 @@ public class IntersectingIterator implem
return true;
}
Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey());
- sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
continue;
}
}
@@ -332,7 +336,7 @@ public class IntersectingIterator implem
if (docIDCompare > 0) {
// seek forwards
Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
- sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
continue;
}
// this source is at the current row, in its column family, and at currentCQ
@@ -485,8 +489,6 @@ public class IntersectingIterator implem
currentPartition = new Text();
currentDocID.set(emptyByteArray);
- this.seekColumnFamilies = seekColumnFamilies;
- this.inclusive = inclusive;
// seek each of the sources to the right column family within the row given by key
for (int i = 0; i < sourcesCount; i++) {
@@ -497,9 +499,11 @@ public class IntersectingIterator implem
} else {
sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term);
}
- sources[i].iter.seek(new Range(sourceKey, true, null, false), seekColumnFamilies, inclusive);
+ // Seek only to the term for this source as a column family
+ sources[i].iter.seek(new Range(sourceKey, true, null, false), sources[i].seekColfams, true);
} else {
- sources[i].iter.seek(range, seekColumnFamilies, inclusive);
+ // Seek only to the term for this source as a column family
+ sources[i].iter.seek(range, sources[i].seekColfams, true);
}
}
advanceToIntersection();