You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/07/05 23:04:29 UTC

svn commit: r1357915 - in /accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators: OrIterator.java SortedKeyValueIterator.java user/IndexedDocIterator.java user/IntersectingIterator.java

Author: ecn
Date: Thu Jul  5 21:04:28 2012
New Revision: 1357915

URL: http://svn.apache.org/viewvc?rev=1357915&view=rev
Log:
ACCUMULO-665: patch from Josh Elser to properly pass column family information for intersecting iterators

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java Thu Jul  5 21:04:28 2012
@@ -19,10 +19,12 @@ package org.apache.accumulo.core.iterato
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
@@ -47,15 +49,19 @@ public class OrIterator implements Sorte
   protected static class TermSource implements Comparable<TermSource> {
     public SortedKeyValueIterator<Key,Value> iter;
     public Text term;
+    public Collection<ByteSequence> seekColfams;
     
     public TermSource(TermSource other) {
       this.iter = other.iter;
       this.term = other.term;
+      this.seekColfams = other.seekColfams;
     }
     
     public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
       this.iter = iter;
       this.term = term;
+      // The desired column families for this source is the term itself
+      this.seekColfams = Collections.<ByteSequence>singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
     }
     
     public int compareTo(TermSource o) {
@@ -143,7 +149,7 @@ public class OrIterator implements Sorte
           newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false);
         }
       }
-      currentTerm.iter.seek(newRange, columnFamilies, inclusive);
+      currentTerm.iter.seek(newRange, currentTerm.seekColfams, true);
       
       // If there is no top key
       // OR we are:
@@ -166,7 +172,7 @@ public class OrIterator implements Sorte
     // because an Or must have at least two elements.
     if (currentTerm == null) {
       for (TermSource TS : sources) {
-        TS.iter.seek(range, columnFamilies, inclusive);
+        TS.iter.seek(range, TS.seekColfams, true);
         
         if ((TS.iter.hasTop()) && ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) == 0)))
           sorted.add(TS);
@@ -196,7 +202,8 @@ public class OrIterator implements Sorte
         }
       }
       
-      TS.iter.seek(newRange, columnFamilies, inclusive);
+      // Seek only to the term for this source as a column family
+      TS.iter.seek(newRange, TS.seekColfams, true);
       
       // If there is no top key
       // OR we are:

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java Thu Jul  5 21:04:28 2012
@@ -81,6 +81,13 @@ public interface SortedKeyValueIterator<
    * Iterators that examine groups of adjacent key/value pairs (e.g. rows) to determine their top key and value should be sure that they properly handle a seek
    * to a key in the middle of such a group (e.g. the middle of a row). Even if the client always seeks to a range containing an entire group (a,c), the tablet
    * server could send back a batch of entries corresponding to (a,b], then reseek the iterator to range (b,c) when the scan is continued.
+   *
+   * {@link columnFamilies} is used, at the lowest level, to determine which data blocks inside of an RFile need to be opened for this iterator. This set of data
+   * blocks is also the set of locality groups defined for the given table. If no columnFamilies are provided, the data blocks for all locality groups inside of
+   * the correct RFile will be opened and seeked in an attempt to find the correct start key, irregardless of the startKey in the {@link range}.
+   *
+   * In an Accumulo instance in which multiple locality groups exist for a table, it is important to ensure that {@link columnFamilies} is properly set to the
+   * minimum required column families to ensure that data from separate locality groups is not inadvertently read.
    * 
    * @param range
    *          <tt>Range</tt> of keys to iterate over.

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java Thu Jul  5 21:04:28 2012
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator.TermSource;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -134,6 +135,10 @@ public class IndexedDocIterator extends 
       docColf = new Text(options.get(docFamilyOptionName));
     docSource = source.deepCopy(env);
     indexColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(indexColf.getBytes(), 0, indexColf.getLength()));
+    
+    for (TermSource ts : this.sources) {
+      ts.seekColfams = indexColfSet;
+    }
   }
   
   @Override
@@ -143,7 +148,7 @@ public class IndexedDocIterator extends 
   
   @Override
   public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
-    super.seek(range, indexColfSet, true);
+    super.seek(range, null, true);
     
   }
   

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java Thu Jul  5 21:04:28 2012
@@ -18,9 +18,11 @@ package org.apache.accumulo.core.iterato
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
@@ -48,6 +50,10 @@ import org.apache.log4j.Logger;
  * 
  * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
  * 
+ * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections
+ * over terms. Extending classes should override the {@link TermSource#seekColfams} in their implementation's 
+ * {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method.
+ * 
  * README.shard in docs/examples shows an example of using the IntersectingIterator.
  */
 public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> {
@@ -83,24 +89,26 @@ public class IntersectingIterator implem
   protected static class TermSource {
     public SortedKeyValueIterator<Key,Value> iter;
     public Text term;
+    public Collection<ByteSequence> seekColfams;
     public boolean notFlag;
     
     public TermSource(TermSource other) {
       this.iter = other.iter;
       this.term = other.term;
       this.notFlag = other.notFlag;
+      this.seekColfams = other.seekColfams;
     }
     
     public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
-      this.iter = iter;
-      this.term = term;
-      this.notFlag = false;
+      this(iter, term, false);
     }
     
     public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) {
       this.iter = iter;
       this.term = term;
       this.notFlag = notFlag;
+      // The desired column families for this source is the term itself
+      this.seekColfams = Collections.<ByteSequence>singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
     }
     
     public String getTermString() {
@@ -121,10 +129,6 @@ public class IntersectingIterator implem
   protected Key topKey = null;
   protected Value value = new Value(emptyByteArray);
   
-  protected Collection<ByteSequence> seekColumnFamilies;
-  
-  protected boolean inclusive;
-  
   public IntersectingIterator() {}
   
   @Override
@@ -196,7 +200,7 @@ public class IntersectingIterator implem
         if (partitionCompare > 0) {
           // seek to at least the currentRow
           Key seekKey = buildKey(currentPartition, sources[sourceID].term);
-          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
           continue;
         }
         // check if this source has gone beyond currentRow
@@ -213,7 +217,7 @@ public class IntersectingIterator implem
           // if not, then seek forwards to the right columnFamily
           if (termCompare > 0) {
             Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
-            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
             continue;
           }
           // check if this source is beyond the right columnFamily
@@ -235,7 +239,7 @@ public class IntersectingIterator implem
         if (docIDCompare > 0) {
           // seek forwards
           Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
-          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
           continue;
         }
         // if we are equal to the target, this is an invalid result.
@@ -273,7 +277,7 @@ public class IntersectingIterator implem
         if (partitionCompare > 0) {
           // seek to at least the currentRow
           Key seekKey = buildKey(currentPartition, sources[sourceID].term);
-          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
           continue;
         }
         // check if this source has gone beyond currentRow
@@ -294,7 +298,7 @@ public class IntersectingIterator implem
           // if not, then seek forwards to the right columnFamily
           if (termCompare > 0) {
             Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
-            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
             continue;
           }
           // check if this source is beyond the right columnFamily
@@ -314,7 +318,7 @@ public class IntersectingIterator implem
               return true;
             }
             Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey());
-            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
             continue;
           }
         }
@@ -332,7 +336,7 @@ public class IntersectingIterator implem
         if (docIDCompare > 0) {
           // seek forwards
           Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
-          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
           continue;
         }
         // this source is at the current row, in its column family, and at currentCQ
@@ -485,8 +489,6 @@ public class IntersectingIterator implem
     currentPartition = new Text();
     currentDocID.set(emptyByteArray);
     
-    this.seekColumnFamilies = seekColumnFamilies;
-    this.inclusive = inclusive;
     
     // seek each of the sources to the right column family within the row given by key
     for (int i = 0; i < sourcesCount; i++) {
@@ -497,9 +499,11 @@ public class IntersectingIterator implem
         } else {
           sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term);
         }
-        sources[i].iter.seek(new Range(sourceKey, true, null, false), seekColumnFamilies, inclusive);
+        // Seek only to the term for this source as a column family
+        sources[i].iter.seek(new Range(sourceKey, true, null, false), sources[i].seekColfams, true);
       } else {
-        sources[i].iter.seek(range, seekColumnFamilies, inclusive);
+        // Seek only to the term for this source as a column family
+        sources[i].iter.seek(range, sources[i].seekColfams, true);
       }
     }
     advanceToIntersection();