You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2011/12/27 19:19:44 UTC

svn commit: r1224966 [6/10] - in /incubator/accumulo/branches/1.4: ./ contrib/accumulo_sample/ contrib/accumulo_sample/ingest/src/main/java/aggregator/ contrib/accumulo_sample/ingest/src/main/java/ingest/ contrib/accumulo_sample/ingest/src/test/java/ag...

Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java Tue Dec 27 18:19:43 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package iterator;
 
 import java.io.IOException;
@@ -23,9 +23,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.PriorityQueue;
 
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
@@ -33,798 +30,796 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 /**
- * An iterator that handles "OR" query constructs on the server side.
- * This code has been adapted/merged from Heap and Multi Iterators.
+ * An iterator that handles "OR" query constructs on the server side. This code has been adapted/merged from Heap and Multi Iterators.
  */
-public class OrIterator implements SortedKeyValueIterator<Key, Value> {
-
-    private TermSource currentTerm;
-    private ArrayList<TermSource> sources;
-    private PriorityQueue<TermSource> sorted = new PriorityQueue<TermSource>(5);
-    private static final Text nullText = new Text();
-    private Key topKey = null;
-    private Range overallRange;
-    private Collection<ByteSequence> columnFamilies;
-    private boolean inclusive;
-    protected static final Logger log = Logger.getLogger(OrIterator.class);
-    private Text parentEndRow;
-
-    protected static class TermSource implements Comparable<TermSource> {
-
-        public SortedKeyValueIterator<Key, Value> iter;
-        public Text dataLocation;
-        public Text term;
-        public Text docid;
-        public Text fieldTerm;
-        public Key topKey;
-        public boolean atEnd;
-
-        public TermSource(TermSource other) {
-            this.iter = other.iter;
-            this.term = other.term;
-            this.dataLocation = other.dataLocation;
-            this.atEnd = other.atEnd;
-        }
-
-        public TermSource(SortedKeyValueIterator<Key, Value> iter, Text term) {
-            this.iter = iter;
-            this.term = term;
-            this.atEnd = false;
-        }
-
-        public TermSource(SortedKeyValueIterator<Key, Value> iter, Text dataLocation, Text term) {
-            this.iter = iter;
-            this.dataLocation = dataLocation;
-            this.term = term;
-            this.atEnd = false;
-        }
-
-        public void setNew() {
-            if (!this.atEnd && this.iter.hasTop()) {
-                this.topKey = this.iter.getTopKey();
-
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.TermSource.setNew TS.iter.topKey >>" + topKey + "<<");
-                }
-
-                if (this.term == null) {
-                    this.docid = this.topKey.getColumnQualifier();
-                } else {
-                    String cqString = this.topKey.getColumnQualifier().toString();
-
-                    int idx = cqString.indexOf("\0");
-                    this.fieldTerm = new Text(cqString.substring(0, idx));
-                    this.docid = new Text(cqString.substring(idx + 1));
-                }
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.TermSource.setNew Setting to null...");
-                }
-
-                //this.term = null;
-                //this.dataLocation = null;
-                this.topKey = null;
-                this.fieldTerm = null;
-                this.docid = null;
-            }
-        }
-
-        public int compareTo(TermSource o) {
-            // NOTE:  If your implementation can have more than one row in a tablet,
-            //        you must compare row key here first, then column qualifier.
-            // NOTE2: A null check is not needed because things are only added to the
-            //        sorted after they have been determined to be valid.
-            //return this.docid.compareTo(o.docid);
-            //return this.topKey.compareTo(o.topKey);
-
-
-            // NOTE!  We need to compare UID's, not Keys!
-            Key k1 = topKey;
-            Key k2 = o.topKey;
-            //return t1.compareTo(t2);
-            String uid1 = getUID(k1);
-            String uid2 = getUID(k2);
-
-            if (uid1 != null && uid2 != null) {
-                return uid1.compareTo(uid2);
-            } else if (uid1 == null && uid2 == null) {
-                return 0;
-            } else if (uid1 == null) {
-                return 1;
-            } else {
-                return -1;
-            }
-
-        }
-
-        @Override
-        public String toString() {
-            return "TermSource: " + this.dataLocation + " " + this.term;
-        }
-
-        public boolean hasTop() {
-            return this.topKey != null;
-        }
-    }
-
-    /**
-     * Returns the given key's row
-     * @param key
-     * @return
-     */
-    protected Text getPartition(Key key) {
-        return key.getRow();
-    }
-
-    /**
-     * Returns the given key's dataLocation
-     * @param key
-     * @return
-     */
-    protected Text getDataLocation(Key key) {
-        return key.getColumnFamily();
-    }
-
-    /**
-     * Returns the given key's term
-     * @param key
-     * @return
-     */
-    protected Text getTerm(Key key) {
-        int idx = 0;
-        String sKey = key.getColumnQualifier().toString();
-
-        idx = sKey.indexOf("\0");
-        return new Text(sKey.substring(0, idx));
-    }
-
-    /**
-     * Returns the given key's DocID
-     * @param key
-     * @return
-     */
-    protected Text getDocID(Key key) {
-        int idx = 0;
-        String sKey = key.getColumnQualifier().toString();
-
-        idx = sKey.indexOf("\0");
-        return new Text(sKey.substring(idx + 1));
-    }
-
-    /**
-     * Returns the given key's UID
-     * @param key
-     * @return
-     */
-    static protected String getUID(Key key) {
-        try {
-            int idx = 0;
-            String sKey = key.getColumnQualifier().toString();
-
-            idx = sKey.lastIndexOf("\0");
-            return sKey.substring(idx + 1);
-        } catch (Exception e) {
-            return null;
-        }
-    }
-
-    public OrIterator() {
-        this.sources = new ArrayList<TermSource>();
-    }
-
-    private OrIterator(OrIterator other, IteratorEnvironment env) {
-        this.sources = new ArrayList<TermSource>();
-
-        for (TermSource TS : other.sources) {
-            this.sources.add(new TermSource(TS.iter.deepCopy(env), TS.dataLocation, TS.term));
-        }
-    }
-
-    public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
-        return new OrIterator(this, env);
-    }
-
-    public void addTerm(SortedKeyValueIterator<Key, Value> source, Text term, IteratorEnvironment env) {
-        if (log.isDebugEnabled()) {
-            log.debug("OI.addTerm Added source w/o family");
-            log.debug("OI.addTerm term >>" + term + "<<");
-        }
-
-        // Don't deepcopy an iterator
-        if (term == null) {
-            this.sources.add(new TermSource(source, term));
+public class OrIterator implements SortedKeyValueIterator<Key,Value> {
+  
+  private TermSource currentTerm;
+  private ArrayList<TermSource> sources;
+  private PriorityQueue<TermSource> sorted = new PriorityQueue<TermSource>(5);
+  private static final Text nullText = new Text();
+  private Key topKey = null;
+  private Range overallRange;
+  private Collection<ByteSequence> columnFamilies;
+  private boolean inclusive;
+  protected static final Logger log = Logger.getLogger(OrIterator.class);
+  private Text parentEndRow;
+  
+  protected static class TermSource implements Comparable<TermSource> {
+    
+    public SortedKeyValueIterator<Key,Value> iter;
+    public Text dataLocation;
+    public Text term;
+    public Text docid;
+    public Text fieldTerm;
+    public Key topKey;
+    public boolean atEnd;
+    
+    public TermSource(TermSource other) {
+      this.iter = other.iter;
+      this.term = other.term;
+      this.dataLocation = other.dataLocation;
+      this.atEnd = other.atEnd;
+    }
+    
+    public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
+      this.iter = iter;
+      this.term = term;
+      this.atEnd = false;
+    }
+    
+    public TermSource(SortedKeyValueIterator<Key,Value> iter, Text dataLocation, Text term) {
+      this.iter = iter;
+      this.dataLocation = dataLocation;
+      this.term = term;
+      this.atEnd = false;
+    }
+    
+    public void setNew() {
+      if (!this.atEnd && this.iter.hasTop()) {
+        this.topKey = this.iter.getTopKey();
+        
+        if (log.isDebugEnabled()) {
+          log.debug("OI.TermSource.setNew TS.iter.topKey >>" + topKey + "<<");
+        }
+        
+        if (this.term == null) {
+          this.docid = this.topKey.getColumnQualifier();
         } else {
-            this.sources.add(new TermSource(source.deepCopy(env), term));
-        }
-    }
-
-    public void addTerm(SortedKeyValueIterator<Key, Value> source, Text dataLocation, Text term, IteratorEnvironment env) {
-        if (log.isDebugEnabled()) {
-            log.debug("OI.addTerm Added source ");
-            log.debug("OI.addTerm family >>" + dataLocation + "<<      term >>" + term + "<<");
-        }
-
-        // Don't deepcopy an iterator
-        if (term == null) {
-            this.sources.add(new TermSource(source, dataLocation, term));
+          String cqString = this.topKey.getColumnQualifier().toString();
+          
+          int idx = cqString.indexOf("\0");
+          this.fieldTerm = new Text(cqString.substring(0, idx));
+          this.docid = new Text(cqString.substring(idx + 1));
+        }
+      } else {
+        if (log.isDebugEnabled()) {
+          log.debug("OI.TermSource.setNew Setting to null...");
+        }
+        
+        // this.term = null;
+        // this.dataLocation = null;
+        this.topKey = null;
+        this.fieldTerm = null;
+        this.docid = null;
+      }
+    }
+    
+    public int compareTo(TermSource o) {
+      // NOTE: If your implementation can have more than one row in a tablet,
+      // you must compare row key here first, then column qualifier.
+      // NOTE2: A null check is not needed because things are only added to the
+      // sorted after they have been determined to be valid.
+      // return this.docid.compareTo(o.docid);
+      // return this.topKey.compareTo(o.topKey);
+      
+      // NOTE! We need to compare UID's, not Keys!
+      Key k1 = topKey;
+      Key k2 = o.topKey;
+      // return t1.compareTo(t2);
+      String uid1 = getUID(k1);
+      String uid2 = getUID(k2);
+      
+      if (uid1 != null && uid2 != null) {
+        return uid1.compareTo(uid2);
+      } else if (uid1 == null && uid2 == null) {
+        return 0;
+      } else if (uid1 == null) {
+        return 1;
+      } else {
+        return -1;
+      }
+      
+    }
+    
+    @Override
+    public String toString() {
+      return "TermSource: " + this.dataLocation + " " + this.term;
+    }
+    
+    public boolean hasTop() {
+      return this.topKey != null;
+    }
+  }
+  
+  /**
+   * Returns the given key's row
+   * 
+   * @param key
+   * @return
+   */
+  protected Text getPartition(Key key) {
+    return key.getRow();
+  }
+  
+  /**
+   * Returns the given key's dataLocation
+   * 
+   * @param key
+   * @return
+   */
+  protected Text getDataLocation(Key key) {
+    return key.getColumnFamily();
+  }
+  
+  /**
+   * Returns the given key's term
+   * 
+   * @param key
+   * @return
+   */
+  protected Text getTerm(Key key) {
+    int idx = 0;
+    String sKey = key.getColumnQualifier().toString();
+    
+    idx = sKey.indexOf("\0");
+    return new Text(sKey.substring(0, idx));
+  }
+  
+  /**
+   * Returns the given key's DocID
+   * 
+   * @param key
+   * @return
+   */
+  protected Text getDocID(Key key) {
+    int idx = 0;
+    String sKey = key.getColumnQualifier().toString();
+    
+    idx = sKey.indexOf("\0");
+    return new Text(sKey.substring(idx + 1));
+  }
+  
+  /**
+   * Returns the given key's UID
+   * 
+   * @param key
+   * @return
+   */
+  static protected String getUID(Key key) {
+    try {
+      int idx = 0;
+      String sKey = key.getColumnQualifier().toString();
+      
+      idx = sKey.indexOf("\0");
+      return sKey.substring(idx + 1);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+  
+  public OrIterator() {
+    this.sources = new ArrayList<TermSource>();
+  }
+  
+  private OrIterator(OrIterator other, IteratorEnvironment env) {
+    this.sources = new ArrayList<TermSource>();
+    
+    for (TermSource TS : other.sources) {
+      this.sources.add(new TermSource(TS.iter.deepCopy(env), TS.dataLocation, TS.term));
+    }
+  }
+  
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new OrIterator(this, env);
+  }
+  
+  public void addTerm(SortedKeyValueIterator<Key,Value> source, Text term, IteratorEnvironment env) {
+    if (log.isDebugEnabled()) {
+      log.debug("OI.addTerm Added source w/o family");
+      log.debug("OI.addTerm term >>" + term + "<<");
+    }
+    
+    // Don't deepcopy an iterator
+    if (term == null) {
+      this.sources.add(new TermSource(source, term));
+    } else {
+      this.sources.add(new TermSource(source.deepCopy(env), term));
+    }
+  }
+  
+  public void addTerm(SortedKeyValueIterator<Key,Value> source, Text dataLocation, Text term, IteratorEnvironment env) {
+    if (log.isDebugEnabled()) {
+      log.debug("OI.addTerm Added source ");
+      log.debug("OI.addTerm family >>" + dataLocation + "<<      term >>" + term + "<<");
+    }
+    
+    // Don't deepcopy an iterator
+    if (term == null) {
+      this.sources.add(new TermSource(source, dataLocation, term));
+    } else {
+      this.sources.add(new TermSource(source.deepCopy(env), dataLocation, term));
+    }
+  }
+  
+  /**
+   * Construct the topKey given the current <code>TermSource</code>
+   * 
+   * @param TS
+   * @return
+   */
+  protected Key buildTopKey(TermSource TS) {
+    if ((TS == null) || (TS.topKey == null)) {
+      return null;
+    }
+    
+    if (log.isDebugEnabled()) {
+      log.debug("OI.buildTopKey New topKey >>" + new Key(TS.topKey.getRow(), TS.dataLocation, TS.docid) + "<<");
+    }
+    
+    return new Key(TS.topKey.getRow(), TS.topKey.getColumnFamily(), TS.topKey.getColumnQualifier());
+  }
+  
+  final public void next() throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("OI.next Enter: sorted.size = " + sorted.size() + " currentTerm = " + ((currentTerm == null) ? "null" : "not null"));
+    }
+    
+    if (currentTerm == null) {
+      if (log.isDebugEnabled()) {
+        log.debug("OI.next currentTerm is NULL... returning");
+      }
+      
+      topKey = null;
+      return;
+    }
+    
+    // Advance currentTerm
+    currentTerm.iter.next();
+    
+    advanceToMatch(currentTerm);
+    
+    currentTerm.setNew();
+    
+    // See if currentTerm is still valid, remove if not
+    if (log.isDebugEnabled()) {
+      log.debug("OI.next Checks (correct = 0,0,0): " + ((currentTerm.topKey != null) ? "0," : "1,") + ((currentTerm.dataLocation != null) ? "0," : "1,")
+          + ((currentTerm.term != null && currentTerm.fieldTerm != null) ? (currentTerm.term.compareTo(currentTerm.fieldTerm)) : "0"));
+    }
+    
+    if (currentTerm.topKey == null || ((currentTerm.dataLocation != null) && (currentTerm.term.compareTo(currentTerm.fieldTerm) != 0))) {
+      if (log.isDebugEnabled()) {
+        log.debug("OI.next removing entry:" + currentTerm.term);
+      }
+      
+      currentTerm = null;
+    }
+    
+    // optimization.
+    // if size == 0, currentTerm is the only item left,
+    // OR there are no items left.
+    // In either case, we don't need to use the PriorityQueue
+    if (sorted.size() > 0) {
+      // sort the term back in
+      if (currentTerm != null) {
+        sorted.add(currentTerm);
+      }
+      // and get the current top item out.
+      currentTerm = sorted.poll();
+    }
+    
+    if (log.isDebugEnabled()) {
+      log.debug("OI.next CurrentTerm is " + ((currentTerm == null) ? "null" : currentTerm));
+    }
+    
+    topKey = buildTopKey(currentTerm);
+    
+    if (hasTop()) {
+      if (overallRange != null && !overallRange.contains(topKey)) {
+        topKey = null;
+      }
+    }
+  }
+  
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    
+    overallRange = new Range(range);
+    if (log.isDebugEnabled()) {
+      log.debug("seek, overallRange: " + overallRange);
+    }
+    
+    // if (range.getStartKey() != null && range.getStartKey().getRow() != null) {
+    // this.parentStartRow = range.getStartKey().getRow();
+    // }
+    
+    if (range.getEndKey() != null && range.getEndKey().getRow() != null) {
+      this.parentEndRow = range.getEndKey().getRow();
+    }
+    
+    if (log.isDebugEnabled()) {
+      log.debug("OI.seek Entry - sources.size = " + sources.size());
+      log.debug("OI.seek Entry - currentTerm = " + ((currentTerm == null) ? "false" : currentTerm.iter.getTopKey()));
+      log.debug("OI.seek Entry - Key from Range = " + ((range == null) ? "false" : range.getStartKey()));
+    }
+    
+    // If sources.size is 0, there is nothing to process, so just return.
+    if (sources.isEmpty()) {
+      currentTerm = null;
+      topKey = null;
+      return;
+    }
+    
+    this.columnFamilies = columnFamilies;
+    this.inclusive = inclusive;
+    
+    Range newRange = range;
+    Key sourceKey = null;
+    Key startKey = null;
+    
+    if (range != null) {
+      startKey = range.getStartKey();
+    }
+    
+    // Clear the PriorityQueue so that we can re-populate it.
+    sorted.clear();
+    
+    TermSource TS = null;
+    Iterator<TermSource> iter = sources.iterator();
+    // For each term, seek forward.
+    // if a hit is not found, delete it from future searches.
+    int counter = 1;
+    while (iter.hasNext()) {
+      TS = iter.next();
+      
+      TS.atEnd = false;
+      
+      if (sources.size() == 1) {
+        currentTerm = TS;
+      }
+      
+      if (log.isDebugEnabled()) {
+        log.debug("OI.seek on TS >>" + TS + "<<");
+        log.debug("OI.seek seeking source >>" + counter + "<< ");
+      }
+      
+      counter++;
+      
+      newRange = range;
+      sourceKey = null;
+      
+      if (startKey != null) {
+        // Construct the new key for the range
+        if (log.isDebugEnabled()) {
+          log.debug("OI.seek startKey >>" + startKey + "<<");
+        }
+        
+        if (startKey.getColumnQualifier() != null) {
+          sourceKey = new Key(startKey.getRow(), (TS.dataLocation == null) ? nullText : TS.dataLocation, new Text(((TS.term == null) ? "" : TS.term + "\0")
+              + range.getStartKey().getColumnQualifier()));
         } else {
-            this.sources.add(new TermSource(source.deepCopy(env), dataLocation, term));
-        }
-    }
-
-    /**
-     * Construct the topKey given the current <code>TermSource</code>
-     * @param TS
-     * @return
-     */
-    protected Key buildTopKey(TermSource TS) {
-        if ((TS == null) || (TS.topKey == null)) {
-            return null;
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("OI.buildTopKey New topKey >>" + new Key(TS.topKey.getRow(), TS.dataLocation, TS.docid) + "<<");
-        }
-
-        return new Key(TS.topKey.getRow(), TS.topKey.getColumnFamily(), TS.topKey.getColumnQualifier());
-    }
-
-    final public void next() throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("OI.next Enter: sorted.size = " + sorted.size() + " currentTerm = "
-                    + ((currentTerm == null) ? "null" : "not null"));
-        }
-
-        if (currentTerm == null) {
-            if (log.isDebugEnabled()) {
-                log.debug("OI.next currentTerm is NULL... returning");
-            }
-
-            topKey = null;
-            return;
+          sourceKey = new Key(startKey.getRow(), (TS.dataLocation == null) ? nullText : TS.dataLocation, (TS.term == null) ? nullText : TS.term);
         }
-
-        // Advance currentTerm
-        currentTerm.iter.next();
-
-        advanceToMatch(currentTerm);
-
-        currentTerm.setNew();
-
-        // See if currentTerm is still valid, remove if not
+        
         if (log.isDebugEnabled()) {
-            log.debug("OI.next Checks (correct = 0,0,0): "
-                    + ((currentTerm.topKey != null) ? "0," : "1,")
-                    + ((currentTerm.dataLocation != null) ? "0," : "1,")
-                    + ((currentTerm.term != null && currentTerm.fieldTerm != null) ? (currentTerm.term.compareTo(currentTerm.fieldTerm)) : "0"));
-        }
-
-        if (currentTerm.topKey == null || ((currentTerm.dataLocation != null) && (currentTerm.term.compareTo(currentTerm.fieldTerm) != 0))) {
-            if (log.isDebugEnabled()) {
-                log.debug("OI.next removing entry:" + currentTerm.term);
-            }
-
-            currentTerm = null;
-        }
-
-
-        // optimization.
-        // if size == 0, currentTerm is the only item left,
-        // OR there are no items left.
-        // In either case, we don't need to use the PriorityQueue
-        if (sorted.size() > 0) {
-            // sort the term back in
-            if (currentTerm != null) {
-                sorted.add(currentTerm);
-            }
-            // and get the current top item out.
-            currentTerm = sorted.poll();
+          log.debug("OI.seek Seeking to the key => " + sourceKey);
         }
-
+        
+        newRange = new Range(sourceKey, true, sourceKey.followingKey(PartialKey.ROW), false);
+      } else {
+        if (log.isDebugEnabled()) {
+          log.debug("OI.seek Using the range Seek() argument to seek => " + newRange);
+        }
+      }
+      
+      TS.iter.seek(newRange, columnFamilies, inclusive);
+      
+      TS.setNew();
+      
+      // Make sure we're on a key with the correct dataLocation and term
+      advanceToMatch(TS);
+      
+      TS.setNew();
+      
+      if (log.isDebugEnabled()) {
+        log.debug("OI.seek sourceKey >>" + sourceKey + "<< ");
+        log.debug("OI.seek topKey >>" + ((TS.topKey == null) ? "false" : TS.topKey) + "<< ");
+        log.debug("OI.seek TS.fieldTerm == " + TS.fieldTerm);
+        
+        log.debug("OI.seek Checks (correct = 0,0,0 / 0,1,1): " + ((TS.topKey != null) ? "0," : "1,") + ((TS.dataLocation != null) ? "0," : "1,")
+            + (((TS.term != null && TS.fieldTerm != null) && (TS.term.compareTo(TS.fieldTerm) != 0)) ? "0" : "1"));
+      }
+      
+      if ((TS.topKey == null) || ((TS.dataLocation != null) && (TS.term.compareTo(TS.fieldTerm) != 0))) {
+        // log.debug("OI.seek Removing " + TS.term);
+        // iter.remove();
+      } // Optimization if we only have one element
+      else if (sources.size() > 0 || iter.hasNext()) {
+        // We have more than one source to search for, use the priority queue
+        sorted.add(TS);
+      } else {
+        // Don't need to continue, only had one item to search
         if (log.isDebugEnabled()) {
-            log.debug("OI.next CurrentTerm is " + ((currentTerm == null) ? "null" : currentTerm));
+          log.debug("OI.seek new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
         }
-
-        topKey = buildTopKey(currentTerm);
-
+        
+        // make sure it is in the range if we have one.
         if (hasTop()) {
-            if (overallRange != null && !overallRange.contains(topKey)) {
-                topKey = null;
-            }
-        }
-    }
-
-    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-
-        overallRange = new Range(range);
-        if(log.isDebugEnabled()){
-            log.debug("seek, overallRange: "+overallRange);
-        }
-
-//        if (range.getStartKey() != null && range.getStartKey().getRow() != null) {
-//            this.parentStartRow = range.getStartKey().getRow();
-//        }
-
-        if (range.getEndKey() != null && range.getEndKey().getRow() != null) {
-            this.parentEndRow = range.getEndKey().getRow();
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("OI.seek Entry - sources.size = " + sources.size());
-            log.debug("OI.seek Entry - currentTerm = " + ((currentTerm == null) ? "false" : currentTerm.iter.getTopKey()));
-            log.debug("OI.seek Entry - Key from Range = " + ((range == null) ? "false" : range.getStartKey()));
-        }
-
-        // If sources.size is 0, there is nothing to process, so just return.
-        if (sources.isEmpty()) {
-            currentTerm = null;
-            topKey = null;
-            return;
-        }
-
-        this.columnFamilies = columnFamilies;
-        this.inclusive = inclusive;
-
-
-        Range newRange = range;
-        Key sourceKey = null;
-        Key startKey = null;
-
-        if (range != null) {
-            startKey = range.getStartKey();
-        }
-
-        // Clear the PriorityQueue so that we can re-populate it.
-        sorted.clear();
-
-        TermSource TS = null;
-        Iterator<TermSource> iter = sources.iterator();
-        // For each term, seek forward.
-        // if a hit is not found, delete it from future searches.
-        int counter = 1;
-        while (iter.hasNext()) {
-            TS = iter.next();
-
-            TS.atEnd = false;
-
-            if (sources.size() == 1) {
-                currentTerm = TS;
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("OI.seek on TS >>" + TS + "<<");
-                log.debug("OI.seek seeking source >>" + counter + "<< ");
-            }
-
-            counter++;
-
-            newRange = range;
-            sourceKey = null;
-
-            if (startKey != null) {
-                // Construct the new key for the range
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.seek startKey >>" + startKey + "<<");
-                }
-
-                if (startKey.getColumnQualifier() != null) {
-                    sourceKey = new Key(startKey.getRow(),
-                            (TS.dataLocation == null) ? nullText : TS.dataLocation,
-                            new Text(((TS.term == null) ? "" : TS.term + "\0") + range.getStartKey().getColumnQualifier()));
-                } else {
-                    sourceKey = new Key(startKey.getRow(),
-                            (TS.dataLocation == null) ? nullText : TS.dataLocation,
-                            (TS.term == null) ? nullText : TS.term);
-                }
-
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.seek Seeking to the key => " + sourceKey);
-                }
-
-                newRange = new Range(sourceKey, true, sourceKey.followingKey(PartialKey.ROW), false);
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.seek Using the range Seek() argument to seek => " + newRange);
-                }
-            }
-
-            TS.iter.seek(newRange, columnFamilies, inclusive);
-
-            TS.setNew();
-
-            // Make sure we're on a key with the correct dataLocation and term
-            advanceToMatch(TS);
-
-            TS.setNew();
-
+          if (overallRange != null && !overallRange.contains(topKey)) {
             if (log.isDebugEnabled()) {
-                log.debug("OI.seek sourceKey >>" + sourceKey + "<< ");
-                log.debug("OI.seek topKey >>" + ((TS.topKey == null) ? "false" : TS.topKey) + "<< ");
-                log.debug("OI.seek TS.fieldTerm == " + TS.fieldTerm);
-
-                log.debug("OI.seek Checks (correct = 0,0,0 / 0,1,1): "
-                        + ((TS.topKey != null) ? "0," : "1,")
-                        + ((TS.dataLocation != null) ? "0," : "1,")
-                        + (((TS.term != null && TS.fieldTerm != null) && (TS.term.compareTo(TS.fieldTerm) != 0)) ? "0" : "1"));
-            }
-
-            if ((TS.topKey == null) || ((TS.dataLocation != null) && (TS.term.compareTo(TS.fieldTerm) != 0))) {
-                //log.debug("OI.seek Removing " + TS.term);
-                //iter.remove();
-            } // Optimization if we only have one element
-            else if (sources.size() > 0 || iter.hasNext()) {
-                // We have more than one source to search for, use the priority queue
-                sorted.add(TS);
-            } else {
-                // Don't need to continue, only had one item to search
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.seek new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
-                }
-
-                // make sure it is in the range if we have one.
-                if (hasTop()) {
-                    if (overallRange != null && !overallRange.contains(topKey)) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("seek, topKey: " + topKey + " is not in the overallRange: " + overallRange);
-                        }
-                        topKey = null;
-                    }
-                }
-                return;
-            }
-        }
-
-        // And set currentTerm = the next valid key/term.
-        currentTerm = sorted.poll();
-
-        if (log.isDebugEnabled()) {
-            log.debug("OI.seek currentTerm = " + currentTerm);
-        }
-
-        topKey = buildTopKey(currentTerm);
-        if (topKey == null) {
-            if (log.isDebugEnabled()) {
-                log.debug("OI.seek() topKey is null");
-            }
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("OI.seek new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
-        }
-
-        if (hasTop()) {
-            if (overallRange != null && !overallRange.contains(topKey)) {
-                if (log.isDebugEnabled()) {
-                    log.debug("seek, topKey: " + topKey + " is not in the overallRange: " + overallRange);
-                }
-                topKey = null;
+              log.debug("seek, topKey: " + topKey + " is not in the overallRange: " + overallRange);
             }
+            topKey = null;
+          }
         }
-
-    }
-
-    final public Key getTopKey() {
-        if (log.isDebugEnabled()) {
-            log.debug("OI.getTopKey key >>" + topKey);
-        }
-
-        return topKey;
-    }
-
-    final public Value getTopValue() {
-        if (log.isDebugEnabled()) {
-            log.debug("OI.getTopValue key >>" + currentTerm.iter.getTopValue());
-        }
-
-        return currentTerm.iter.getTopValue();
-    }
-
-    final public boolean hasTop() {
-        if (log.isDebugEnabled()) {
-            log.debug("OI.hasTop  =  " + ((topKey == null) ? "false" : "true"));
-        }
-
-        return topKey != null;
-    }
-
-    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
-        throw new UnsupportedOperationException();
+        return;
+      }
     }
-
-    /**
-     * Ensures that the current <code>TermSource</code> is pointing to a key with the correct <code>dataLocation</code> and
-     * <code>term</code> or sets <code>topKey</code> to null if there is no such key remaining.
-     * @param TS The <code>TermSource</code> to advance
-     * @throws IOException
-     */
-    private void advanceToMatch(TermSource TS) throws IOException {
-        boolean matched = false;
-        while (!matched) {
-            if (!TS.iter.hasTop()) {
-                TS.topKey = null;
-                return;
-            }
-
-            Key iterTopKey = TS.iter.getTopKey();
-
-            if (log.isDebugEnabled()) {
-                log.debug("OI.advanceToMatch current topKey = " + iterTopKey);
-            }
-
-            // we should compare the row to the end of the range
-            if (overallRange.getEndKey() != null) {
-
-                if (overallRange != null && !overallRange.contains(TS.iter.getTopKey())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("overallRange: " + overallRange + " does not contain TS.iter.topKey: " + TS.iter.getTopKey());
-                        log.debug("OI.advanceToMatch at the end, returning");
-                    }
-
-                    TS.atEnd = true;
-                    TS.topKey = null;
-
-                    return;
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("OI.advanceToMatch not at the end");
-                    }
-                }
+    
+    // And set currentTerm = the next valid key/term.
+    currentTerm = sorted.poll();
+    
+    if (log.isDebugEnabled()) {
+      log.debug("OI.seek currentTerm = " + currentTerm);
+    }
+    
+    topKey = buildTopKey(currentTerm);
+    if (topKey == null) {
+      if (log.isDebugEnabled()) {
+        log.debug("OI.seek() topKey is null");
+      }
+    }
+    
+    if (log.isDebugEnabled()) {
+      log.debug("OI.seek new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
+    }
+    
+    if (hasTop()) {
+      if (overallRange != null && !overallRange.contains(topKey)) {
+        if (log.isDebugEnabled()) {
+          log.debug("seek, topKey: " + topKey + " is not in the overallRange: " + overallRange);
+        }
+        topKey = null;
+      }
+    }
+    
+  }
+  
+  final public Key getTopKey() {
+    if (log.isDebugEnabled()) {
+      log.debug("OI.getTopKey key >>" + topKey);
+    }
+    
+    return topKey;
+  }
+  
+  final public Value getTopValue() {
+    if (log.isDebugEnabled()) {
+      log.debug("OI.getTopValue key >>" + currentTerm.iter.getTopValue());
+    }
+    
+    return currentTerm.iter.getTopValue();
+  }
+  
+  final public boolean hasTop() {
+    if (log.isDebugEnabled()) {
+      log.debug("OI.hasTop  =  " + ((topKey == null) ? "false" : "true"));
+    }
+    
+    return topKey != null;
+  }
+  
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  /**
+   * Ensures that the current <code>TermSource</code> is pointing to a key with the correct <code>dataLocation</code> and <code>term</code> or sets
+   * <code>topKey</code> to null if there is no such key remaining.
+   * 
+   * @param TS
+   *          The <code>TermSource</code> to advance
+   * @throws IOException
+   */
+  private void advanceToMatch(TermSource TS) throws IOException {
+    boolean matched = false;
+    while (!matched) {
+      if (!TS.iter.hasTop()) {
+        TS.topKey = null;
+        return;
+      }
+      
+      Key iterTopKey = TS.iter.getTopKey();
+      
+      if (log.isDebugEnabled()) {
+        log.debug("OI.advanceToMatch current topKey = " + iterTopKey);
+      }
+      
+      // we should compare the row to the end of the range
+      if (overallRange.getEndKey() != null) {
+        
+        if (overallRange != null && !overallRange.contains(TS.iter.getTopKey())) {
+          if (log.isDebugEnabled()) {
+            log.debug("overallRange: " + overallRange + " does not contain TS.iter.topKey: " + TS.iter.getTopKey());
+            log.debug("OI.advanceToMatch at the end, returning");
+          }
+          
+          TS.atEnd = true;
+          TS.topKey = null;
+          
+          return;
+        } else {
+          if (log.isDebugEnabled()) {
+            log.debug("OI.advanceToMatch not at the end");
+          }
+        }
+      } else {
+        if (log.isDebugEnabled()) {
+          log.debug("OI.advanceToMatch overallRange.getEndKey() == null");
+        }
+      }
+      
+      // Advance to the correct dataLocation
+      if (log.isDebugEnabled()) {
+        log.debug("Comparing dataLocations.");
+        log.debug("OI.advanceToMatch dataLocationCompare: " + getDataLocation(iterTopKey) + " == " + TS.dataLocation);
+      }
+      
+      int dataLocationCompare = getDataLocation(iterTopKey).compareTo(TS.dataLocation);
+      
+      if (log.isDebugEnabled()) {
+        log.debug("OI.advanceToMatch dataLocationCompare = " + dataLocationCompare);
+      }
+      
+      // Make sure we're at a row for this dataLocation
+      if (dataLocationCompare < 0) {
+        if (log.isDebugEnabled()) {
+          log.debug("OI.advanceToMatch seek to desired dataLocation");
+        }
+        
+        Key seekKey = new Key(iterTopKey.getRow(), TS.dataLocation, nullText);
+        
+        if (log.isDebugEnabled()) {
+          log.debug("OI.advanceToMatch seeking to => " + seekKey);
+        }
+        
+        TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
+        
+        continue;
+      } else if (dataLocationCompare > 0) {
+        if (log.isDebugEnabled()) {
+          log.debug("OI.advanceToMatch advanced beyond desired dataLocation, seek to next row");
+        }
+        
+        // Gone past the current dataLocation, seek to the next row
+        Key seekKey = iterTopKey.followingKey(PartialKey.ROW);
+        
+        if (log.isDebugEnabled()) {
+          log.debug("OI.advanceToMatch seeking to => " + seekKey);
+        }
+        
+        TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
+        
+        continue;
+      }
+      
+      // Advance to the correct term
+      if (log.isDebugEnabled()) {
+        log.debug("OI.advanceToMatch termCompare: " + getTerm(iterTopKey) + " == " + TS.term);
+      }
+      
+      int termCompare = getTerm(iterTopKey).compareTo(TS.term);
+      
+      if (log.isDebugEnabled()) {
+        log.debug("OI.advanceToMatch termCompare = " + termCompare);
+      }
+      
+      // Make sure we're at a row for this term
+      if (termCompare < 0) {
+        if (log.isDebugEnabled()) {
+          log.debug("OI.advanceToMatch seek to desired term");
+        }
+        
+        Key seekKey = new Key(iterTopKey.getRow(), iterTopKey.getColumnFamily(), TS.term);
+        
+        if (log.isDebugEnabled()) {
+          log.debug("OI.advanceToMatch seeking to => " + seekKey);
+        }
+        
+        TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
+        
+        continue;
+      } else if (termCompare > 0) {
+        if (log.isDebugEnabled()) {
+          log.debug("OI.advanceToMatch advanced beyond desired term, seek to next row");
+        }
+        
+        // Gone past the current term, seek to the next row
+        Key seekKey = iterTopKey.followingKey(PartialKey.ROW);
+        
+        if (log.isDebugEnabled()) {
+          log.debug("OI.advanceToMatch seeking to => " + seekKey);
+        }
+        
+        TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
+        continue;
+      }
+      
+      // If we made it here, we found a match
+      matched = true;
+    }
+  }
+  
+  public boolean jump(Key jumpKey) throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("OR jump: " + jumpKey);
+      printTopKeysForTermSources();
+    }
+    
+    // is the jumpKey outside my overall range?
+    if (parentEndRow != null && parentEndRow.compareTo(jumpKey.getRow()) < 0) {
+      // can't go there.
+      if (log.isDebugEnabled()) {
+        log.debug("jumpRow: " + jumpKey.getRow() + " is greater than my parentEndRow: " + parentEndRow);
+      }
+      return false;
+    }
+    
+    // Clear the PriorityQueue so that we can re-populate it.
+    sorted.clear();
+    
+    // check each term source and jump it if necessary.
+    for (TermSource ts : sources) {
+      int comp;
+      if (!ts.hasTop()) {
+        if (log.isDebugEnabled()) {
+          log.debug("jump called, but ts.topKey is null, this one needs to move to next row.");
+        }
+        Key endKey = null;
+        if (parentEndRow != null) {
+          endKey = new Key(parentEndRow);
+        }
+        Range newRange = new Range(jumpKey, true, endKey, false);
+        ts.iter.seek(newRange, columnFamilies, inclusive);
+        ts.setNew();
+        advanceToMatch(ts);
+        ts.setNew();
+        
+      } else {
+        // check row, then uid
+        comp = this.topKey.getRow().compareTo(jumpKey.getRow());
+        if (comp > 0) {
+          if (log.isDebugEnabled()) {
+            log.debug("jump, our row is ahead of jumpKey.");
+            log.debug("jumpRow: " + jumpKey.getRow() + " myRow: " + topKey.getRow() + " parentEndRow" + parentEndRow);
+          }
+          if (ts.hasTop()) {
+            sorted.add(ts);
+          }
+          // do nothing, we're ahead of jumpKey row and have topkey
+        } else if (comp < 0) { // a row behind jump key, need to move forward
+          if (log.isDebugEnabled()) {
+            log.debug("OR jump, row jump");
+          }
+          Key endKey = null;
+          if (parentEndRow != null) {
+            endKey = new Key(parentEndRow);
+          }
+          Key sKey = new Key(jumpKey.getRow());
+          Range fake = new Range(sKey, true, endKey, false);
+          ts.iter.seek(fake, columnFamilies, inclusive);
+          ts.setNew();
+          advanceToMatch(ts);
+          ts.setNew();
+        } else {
+          // need to check uid
+          String myUid = getUID(ts.topKey);
+          String jumpUid = getUID(jumpKey);
+          
+          if (log.isDebugEnabled()) {
+            if (myUid == null) {
+              log.debug("myUid is null");
             } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.advanceToMatch overallRange.getEndKey() == null");
-                }
+              log.debug("myUid: " + myUid);
             }
-
-            // Advance to the correct dataLocation
-            if (log.isDebugEnabled()) {
-                log.debug("Comparing dataLocations.");
-                log.debug("OI.advanceToMatch dataLocationCompare: " + getDataLocation(iterTopKey) + " == " + TS.dataLocation);
-            }
-
-            int dataLocationCompare = getDataLocation(iterTopKey).compareTo(TS.dataLocation);
-
-            if (log.isDebugEnabled()) {
-                log.debug("OI.advanceToMatch dataLocationCompare = " + dataLocationCompare);
-            }
-
-            // Make sure we're at a row for this dataLocation
-            if (dataLocationCompare < 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.advanceToMatch seek to desired dataLocation");
-                }
-
-                Key seekKey = new Key(iterTopKey.getRow(), TS.dataLocation, nullText);
-
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.advanceToMatch seeking to => " + seekKey);
-                }
-
-                TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
-
-                continue;
-            } else if (dataLocationCompare > 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.advanceToMatch advanced beyond desired dataLocation, seek to next row");
-                }
-
-                // Gone past the current dataLocation, seek to the next row
-                Key seekKey = iterTopKey.followingKey(PartialKey.ROW);
-
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.advanceToMatch seeking to => " + seekKey);
-                }
-
-            TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
-
-                continue;
-            }
-
-            // Advance to the correct term
-            if (log.isDebugEnabled()) {
-                log.debug("OI.advanceToMatch termCompare: " + getTerm(iterTopKey) + " == " + TS.term);
-            }
-
-            int termCompare = getTerm(iterTopKey).compareTo(TS.term);
-
-            if (log.isDebugEnabled()) {
-                log.debug("OI.advanceToMatch termCompare = " + termCompare);
-            }
-
-            // Make sure we're at a row for this term
-            if (termCompare < 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.advanceToMatch seek to desired term");
-                }
-
-                Key seekKey = new Key(iterTopKey.getRow(), iterTopKey.getColumnFamily(), TS.term);
-
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.advanceToMatch seeking to => " + seekKey);
-                }
-
-                TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
-
-                continue;
-            } else if (termCompare > 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.advanceToMatch advanced beyond desired term, seek to next row");
-                }
-
-                // Gone past the current term, seek to the next row
-                Key seekKey = iterTopKey.followingKey(PartialKey.ROW);
-
-                if (log.isDebugEnabled()) {
-                    log.debug("OI.advanceToMatch seeking to => " + seekKey);
-                }
-
-                TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
-                continue;
-            }
-
-            // If we made it here, we found a match
-            matched = true;
-        }
-    }
-
-    public boolean jump(Key jumpKey) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("OR jump: " + jumpKey);
-            printTopKeysForTermSources();
-        }
-
-        // is the jumpKey outside my overall range?
-        if (parentEndRow != null && parentEndRow.compareTo(jumpKey.getRow()) < 0) {
-            //can't go there.
-            if (log.isDebugEnabled()) {
-                log.debug("jumpRow: " + jumpKey.getRow() + " is greater than my parentEndRow: " + parentEndRow);
-            }
-            return false;
-        }
-
-        // Clear the PriorityQueue so that we can re-populate it.
-        sorted.clear();
-
-        // check each term source and jump it if necessary.
-        for (TermSource ts : sources) {
-            int comp;
-            if (!ts.hasTop()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("jump called, but ts.topKey is null, this one needs to move to next row.");
-                }
-                Key endKey = null;
-                if (parentEndRow != null) {
-                    endKey = new Key(parentEndRow);
-                }
-                Range newRange = new Range(jumpKey, true, endKey, false);
-                ts.iter.seek(newRange, columnFamilies, inclusive);
-                ts.setNew();
-                advanceToMatch(ts);
-                ts.setNew();
-
+            
+            if (jumpUid == null) {
+              log.debug("jumpUid is null");
             } else {
-                // check row, then uid
-                comp = this.topKey.getRow().compareTo(jumpKey.getRow());
-                if (comp > 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("jump, our row is ahead of jumpKey.");
-                        log.debug("jumpRow: " + jumpKey.getRow() + " myRow: " + topKey.getRow() + " parentEndRow" + parentEndRow);
-                    }
-                    if (ts.hasTop()) {
-                        sorted.add(ts);
-                    }
-                    // do nothing, we're ahead of jumpKey row and have topkey
-                } else if (comp < 0) { //a row behind jump key, need to move forward
-                    if (log.isDebugEnabled()) {
-                        log.debug("OR jump, row jump");
-                    }
-                    Key endKey = null;
-                    if (parentEndRow != null) {
-                        endKey = new Key(parentEndRow);
-                    }
-                    Key sKey = new Key(jumpKey.getRow());
-                    Range fake = new Range(sKey, true, endKey, false);
-                    ts.iter.seek(fake, columnFamilies, inclusive);
-                    ts.setNew();
-                    advanceToMatch(ts);
-                    ts.setNew();
-                } else {
-                    //need to check uid
-                    String myUid = getUID(ts.topKey);
-                    String jumpUid = getUID(jumpKey);
-
-                    if (log.isDebugEnabled()) {
-                        if (myUid == null) {
-                            log.debug("myUid is null");
-                        } else {
-                            log.debug("myUid: " + myUid);
-                        }
-
-                        if (jumpUid == null) {
-                            log.debug("jumpUid is null");
-                        } else {
-                            log.debug("jumpUid: " + jumpUid);
-                        }
-                    }
-
-                    int ucomp = myUid.compareTo(jumpUid);
-                    if (ucomp < 0) {
-                        // need to move forward
-                        // create range and seek it.
-                        Text row = ts.topKey.getRow();
-                        Text cf = ts.topKey.getColumnFamily();
-                        String cq = ts.topKey.getColumnQualifier().toString().replaceAll(myUid, jumpUid);
-                        Text cq_text = new Text(cq);
-                        Key sKey = new Key(row, cf, cq_text);
-                        Key eKey = null;
-                        if (parentEndRow != null) {
-                            eKey = new Key(parentEndRow);
-                        }
-                        Range fake = new Range(sKey, true, eKey, false);
-                        if (log.isDebugEnabled()) {
-                            log.debug("uid jump, new ts.iter.seek range: " + fake);
-                        }
-                        ts.iter.seek(fake, columnFamilies, inclusive);
-                        ts.setNew();
-                        advanceToMatch(ts);
-                        ts.setNew();
-
-                        if (log.isDebugEnabled()) {
-                            if (ts.iter.hasTop()) {
-                                log.debug("ts.iter.topkey: " + ts.iter.getTopKey());
-                            } else {
-                                log.debug("ts.iter.topKey is null");
-                            }
-                        }
-                    }//else do nothing, we're ahead of jump key
-                }
-            }
-
-            // ts should have moved, validate this particular ts.
-            if (ts.hasTop()) {
-                if (overallRange != null) {
-                    if (overallRange.contains(topKey)) {
-                        //if (topKey.getRow().compareTo(parentEndRow) < 0) {
-                        sorted.add(ts);
-                    }
-                } else {
-                    sorted.add(ts);
-                }
+              log.debug("jumpUid: " + jumpUid);
             }
+          }
+          
+          int ucomp = myUid.compareTo(jumpUid);
+          if (ucomp < 0) {
+            // need to move forward
+            // create range and seek it.
+            Text row = ts.topKey.getRow();
+            Text cf = ts.topKey.getColumnFamily();
+            String cq = ts.topKey.getColumnQualifier().toString().replaceAll(myUid, jumpUid);
+            Text cq_text = new Text(cq);
+            Key sKey = new Key(row, cf, cq_text);
+            Key eKey = null;
+            if (parentEndRow != null) {
+              eKey = new Key(parentEndRow);
+            }
+            Range fake = new Range(sKey, true, eKey, false);
+            if (log.isDebugEnabled()) {
+              log.debug("uid jump, new ts.iter.seek range: " + fake);
+            }
+            ts.iter.seek(fake, columnFamilies, inclusive);
+            ts.setNew();
+            advanceToMatch(ts);
+            ts.setNew();
+            
+            if (log.isDebugEnabled()) {
+              if (ts.iter.hasTop()) {
+                log.debug("ts.iter.topkey: " + ts.iter.getTopKey());
+              } else {
+                log.debug("ts.iter.topKey is null");
+              }
+            }
+          }// else do nothing, we're ahead of jump key
+        }
+      }
+      
+      // ts should have moved, validate this particular ts.
+      if (ts.hasTop()) {
+        if (overallRange != null) {
+          if (overallRange.contains(topKey)) {
+            // if (topKey.getRow().compareTo(parentEndRow) < 0) {
+            sorted.add(ts);
+          }
+        } else {
+          sorted.add(ts);
         }
-        // now get the top key from all TermSources.
-        currentTerm = sorted.poll();
-        if (log.isDebugEnabled()) {
-            log.debug("OI.jump currentTerm = " + currentTerm);
-        }
-
-        topKey = buildTopKey(currentTerm);
-        if (log.isDebugEnabled()) {
-            log.debug("OI.jump new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
-        }
-        return hasTop();
+      }
     }
-
-    private void printTopKeysForTermSources() {
-        if (log.isDebugEnabled()) {
-            for (TermSource ts : sources) {
-                if (ts != null) {
-                    if (ts.topKey == null) {
-                        log.debug(ts.toString() + " topKey is null");
-                    } else {
-                        log.debug(ts.toString() + " topKey: " + ts.topKey);
-                    }
-                } else {
-                    log.debug("ts is null");
-                }
-            }
-
-            if (topKey != null) {
-                log.debug("OrIterator current topKey: " + topKey);
-            } else {
-                log.debug("OrIterator current topKey is null");
-            }
+    // now get the top key from all TermSources.
+    currentTerm = sorted.poll();
+    if (log.isDebugEnabled()) {
+      log.debug("OI.jump currentTerm = " + currentTerm);
+    }
+    
+    topKey = buildTopKey(currentTerm);
+    if (log.isDebugEnabled()) {
+      log.debug("OI.jump new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
+    }
+    return hasTop();
+  }
+  
+  private void printTopKeysForTermSources() {
+    if (log.isDebugEnabled()) {
+      for (TermSource ts : sources) {
+        if (ts != null) {
+          if (ts.topKey == null) {
+            log.debug(ts.toString() + " topKey is null");
+          } else {
+            log.debug(ts.toString() + " topKey: " + ts.topKey);
+          }
+        } else {
+          log.debug("ts is null");
         }
+      }
+      
+      if (topKey != null) {
+        log.debug("OrIterator current topKey: " + topKey);
+      } else {
+        log.debug("OrIterator current topKey is null");
+      }
     }
+  }
 }

Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java Tue Dec 27 18:19:43 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package iterator;
 
 import java.io.IOException;
@@ -37,265 +37,261 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
 /**
- * This iterator takes the source iterator (the one below it in the iterator stack)
- * and puts it in a background thread. The background thread continues processing
- * and fills a queue with the Keys and Values from the source iterator. When seek()
- * is called on this iterator, it pauses the background thread, clears the queue, 
- * calls seek() on the source iterator, then resumes the thread filling the queue.
- *
- * Users of this iterator can set the queue size, default is five elements. Users 
- * must be aware of the potential for OutOfMemory errors when using this iterator
- * with large queue sizes or large objects. This iterator copies the Key and Value from
- * the source iterator and puts them into the queue.
+ * This iterator takes the source iterator (the one below it in the iterator stack) and puts it in a background thread. The background thread continues
+ * processing and fills a queue with the Keys and Values from the source iterator. When seek() is called on this iterator, it pauses the background thread,
+ * clears the queue, calls seek() on the source iterator, then resumes the thread filling the queue.
+ * 
+ * Users of this iterator can set the queue size, default is five elements. Users must be aware of the potential for OutOfMemory errors when using this iterator
+ * with large queue sizes or large objects. This iterator copies the Key and Value from the source iterator and puts them into the queue.
  * 
- * This iterator introduces some parallelism into the server side iterator stack. One use
- * case for this would be when an iterator takes a relatively long time to process each
- * K,V pair and causes the iterators above it to wait. By putting the longer running iterator
- * in a background thread we should be able to achieve greater throughput.
+ * This iterator introduces some parallelism into the server side iterator stack. One use case for this would be when an iterator takes a relatively long time
+ * to process each K,V pair and causes the iterators above it to wait. By putting the longer running iterator in a background thread we should be able to
+ * achieve greater throughput.
  * 
  * NOTE: Experimental!
- *
+ * 
  */
-public class ReadAheadIterator implements SortedKeyValueIterator<Key, Value>, OptionDescriber {
-
-	private static Logger log = Logger.getLogger(ReadAheadIterator.class);
-	
-	public static final String QUEUE_SIZE = "queue.size";
-	
-	public static final String TIMEOUT = "timeout";
-	
-	private static final QueueElement noMoreDataElement = new QueueElement();
-	
-	private int queueSize = 5;
-	
-	private int timeout = 60;
-	
-	/**
-	 * 
-	 * Class to hold key and value from the producing thread.
-	 *
-	 */
-	static class QueueElement {
-		Key key = null;
-		Value value = null;
-		public QueueElement() {}
-		public QueueElement(Key key, Value value) {
-			super();
-			this.key = new Key(key);
-			this.value = new Value(value.get(),true);
-		}
-		public Key getKey() {
-			return key;
-		}
-		public Value getValue() {
-			return value;
-		}
-	}
-	
-	/**
-	 * 
-	 * Thread that produces data from the source iterator and places the results
-	 * in a queue.
-	 *
-	 */
-	class ProducerThread extends ReentrantLock implements Runnable {
-		
-		private static final long serialVersionUID = 1L;
-
-		private Exception e = null;
-		
-		private int waitTime = timeout;
-		
-		private SortedKeyValueIterator<Key, Value> sourceIter = null;
-		
-		public ProducerThread(SortedKeyValueIterator<Key, Value> source) {
-			this.sourceIter = source;
-		}
-				
-		public void run() {
-			boolean hasMoreData = true;
-			//Keep this thread running while there is more data to read
-			//and items left in the queue to be read off.
-			while (hasMoreData || queue.size() > 0) {
-				try {
-					//Acquire the lock, this will wait if the lock is being
-					//held by the ReadAheadIterator.seek() method.
-					this.lock();
-					//Check to see if there is more data from the iterator below.
-					hasMoreData = sourceIter.hasTop();
-					//Break out of the loop if no more data.
-					if (!hasMoreData)
-						continue;
-					//Put the next K,V onto the queue.
-					try {
-						QueueElement e = new QueueElement(sourceIter.getTopKey(), sourceIter.getTopValue());
-						boolean inserted = false;
-						try {
-							inserted = queue.offer(e, this.waitTime, TimeUnit.SECONDS);
-						} catch (InterruptedException ie) {
-							this.e = ie;
-							break;
-						}
-						if (!inserted) {
-							//Then we either got a timeout, set the error and break out of the loop
-							this.e = new TimeoutException("Background thread has exceeded wait time of " + this.waitTime + " seconds, aborting...");
-							break;
-						}
-						//Move the iterator to the next K,V for the next iteration of this loop
-						sourceIter.next();
-					} catch (Exception e) {
-						this.e = e;
-						log.error("Error calling next on source iterator", e);
-						break;
-					}
-				} finally {
-					this.unlock();
-				}
-			}
-			//If we broke out of the loop because of an error, then don't put the marker on the queue, just to do end.
-			if (!hasError()) {
-				//Put the special end of data marker into the queue
-				try {
-					queue.put(noMoreDataElement);
-				} catch (InterruptedException e) {
-					this.e = e;
-					log.error("Error putting End of Data marker onto queue");
-				}
-			}
-		}
-		
-		public boolean hasError() {
-			return (this.e != null);
-		}
-		
-		public Exception getError() {
-			return this.e;
-		}
-	}
-	
-	private SortedKeyValueIterator<Key, Value> source;
-	private ArrayBlockingQueue<QueueElement> queue = null;
-	private QueueElement currentElement = new QueueElement();
-	private ProducerThread thread = null;
-	private Thread t = null;
-	
-	protected ReadAheadIterator(ReadAheadIterator other, IteratorEnvironment env)
-	{
-		source = other.source.deepCopy(env);
-	}
-		
-	public ReadAheadIterator(){}
-
-	public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
-		return new ReadAheadIterator(this, env);
-	}
-
-	public Key getTopKey() {
-		return currentElement.getKey();
-	}
-
-	public Value getTopValue() {
-		return currentElement.getValue();
-	}
-
-	public boolean hasTop() {
-		if (currentElement == noMoreDataElement)
-			return false;
-		return currentElement != null || queue.size() > 0 || source.hasTop();
-	}
-
-	public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
-		validateOptions(options);
-		this.source = source;
-		queue = new ArrayBlockingQueue<QueueElement>(queueSize);
-		thread = new ProducerThread(this.source);
-		t = new Thread(thread, "ReadAheadIterator-SourceThread");
-		t.start();
-	}
-
-	/**
-	 * Populate the key and value
-	 */
-	public void next() throws IOException {
-		//Thread startup race condition, need to make sure that the 
-		//thread has started before we call this the first time.
-		while (t.getState().equals(State.NEW)) {
-			try {
-				Thread.sleep(1);
-			} catch (InterruptedException e) {}
-		}
-		
-		if (t.getState().equals(State.TERMINATED)) {
-			//Thread encountered an error.
-			if (thread.hasError()) {
-				//and it should
-				throw new IOException("Background thread has died", thread.getError());
-			}
-		}
-		
-		//Pull an element off the queue, this will wait if there is no data yet.
-		try {
-			if (thread.hasError())
-				throw new IOException("background thread has error", thread.getError());
-			
-			QueueElement nextElement = null;
-			while (null == nextElement) {
-				try {
-					nextElement = queue.poll(1, TimeUnit.SECONDS);
-				} catch (InterruptedException e) {
-					//TODO: Do we need to do anything here?
-				}
-				if (null == nextElement) {
-					//Then we have no data and timed out, check for error condition in the read ahead thread
-					if (thread.hasError()) {
-						throw new IOException("background thread has error", thread.getError());
-					}
-				}
-			}
-			currentElement = nextElement;
-		} catch (IOException e) {
-			throw new IOException("Error getting element from source iterator", e);
-		}		
-	}
-	
-	/**
-	 * Seek to the next matching cell and call next to populate the key and value.
-	 */
-	public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-		if (t.isAlive()) {
-			//Check for error
-			if (thread.hasError())
-				throw new IOException("background thread has error", thread.getError());
-			
-			try {
-				//Acquire the lock, or wait until its unlocked by the producer thread.
-				thread.lock();
-				queue.clear();
-				currentElement = null;
-				source.seek(range, columnFamilies, inclusive);
-			} finally {
-				thread.unlock();
-			}
-			next();
-		} else {
-			throw new IOException("source iterator thread has died.");
-		}
-	}
-
-	public IteratorOptions describeOptions() {
-		Map<String,String> options = new HashMap<String,String>();
-		options.put(QUEUE_SIZE, "read ahead queue size");
-		options.put(TIMEOUT, "timeout in seconds before background thread thinks that the client has aborted");
-		return new IteratorOptions(getClass().getSimpleName(),"Iterator that puts the source in another thread",
-				options, null);
-	}
-
-	public boolean validateOptions(Map<String, String> options) {
-		if (options.containsKey(QUEUE_SIZE))
-			queueSize = Integer.parseInt(options.get(QUEUE_SIZE));
-		if (options.containsKey(TIMEOUT))
-			timeout = Integer.parseInt(options.get(TIMEOUT));
-		return true;
-	}
-
+public class ReadAheadIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+  
+  private static Logger log = Logger.getLogger(ReadAheadIterator.class);
+  
+  public static final String QUEUE_SIZE = "queue.size";
+  
+  public static final String TIMEOUT = "timeout";
+  
+  private static final QueueElement noMoreDataElement = new QueueElement();
+  
+  private int queueSize = 5;
+  
+  private int timeout = 60;
+  
+  /**
+   * 
+   * Class to hold key and value from the producing thread.
+   * 
+   */
+  static class QueueElement {
+    Key key = null;
+    Value value = null;
+    
+    public QueueElement() {}
+    
+    public QueueElement(Key key, Value value) {
+      super();
+      this.key = new Key(key);
+      this.value = new Value(value.get(), true);
+    }
+    
+    public Key getKey() {
+      return key;
+    }
+    
+    public Value getValue() {
+      return value;
+    }
+  }
+  
+  /**
+   * 
+   * Thread that produces data from the source iterator and places the results in a queue.
+   * 
+   */
+  class ProducerThread extends ReentrantLock implements Runnable {
+    
+    private static final long serialVersionUID = 1L;
+    
+    private Exception e = null;
+    
+    private int waitTime = timeout;
+    
+    private SortedKeyValueIterator<Key,Value> sourceIter = null;
+    
+    public ProducerThread(SortedKeyValueIterator<Key,Value> source) {
+      this.sourceIter = source;
+    }
+    
+    public void run() {
+      boolean hasMoreData = true;
+      // Keep this thread running while there is more data to read
+      // and items left in the queue to be read off.
+      while (hasMoreData || queue.size() > 0) {
+        try {
+          // Acquire the lock, this will wait if the lock is being
+          // held by the ReadAheadIterator.seek() method.
+          this.lock();
+          // Check to see if there is more data from the iterator below.
+          hasMoreData = sourceIter.hasTop();
+          // Break out of the loop if no more data.
+          if (!hasMoreData)
+            continue;
+          // Put the next K,V onto the queue.
+          try {
+            QueueElement e = new QueueElement(sourceIter.getTopKey(), sourceIter.getTopValue());
+            boolean inserted = false;
+            try {
+              inserted = queue.offer(e, this.waitTime, TimeUnit.SECONDS);
+            } catch (InterruptedException ie) {
+              this.e = ie;
+              break;
+            }
+            if (!inserted) {
+              // Then we either got a timeout, set the error and break out of the loop
+              this.e = new TimeoutException("Background thread has exceeded wait time of " + this.waitTime + " seconds, aborting...");
+              break;
+            }
+            // Move the iterator to the next K,V for the next iteration of this loop
+            sourceIter.next();
+          } catch (Exception e) {
+            this.e = e;
+            log.error("Error calling next on source iterator", e);
+            break;
+          }
+        } finally {
+          this.unlock();
+        }
+      }
+      // If we broke out of the loop because of an error, then don't put the marker on the queue, just to do end.
+      if (!hasError()) {
+        // Put the special end of data marker into the queue
+        try {
+          queue.put(noMoreDataElement);
+        } catch (InterruptedException e) {
+          this.e = e;
+          log.error("Error putting End of Data marker onto queue");
+        }
+      }
+    }
+    
+    public boolean hasError() {
+      return (this.e != null);
+    }
+    
+    public Exception getError() {
+      return this.e;
+    }
+  }
+  
+  private SortedKeyValueIterator<Key,Value> source;
+  private ArrayBlockingQueue<QueueElement> queue = null;
+  private QueueElement currentElement = new QueueElement();
+  private ProducerThread thread = null;
+  private Thread t = null;
+  
+  protected ReadAheadIterator(ReadAheadIterator other, IteratorEnvironment env) {
+    source = other.source.deepCopy(env);
+  }
+  
+  public ReadAheadIterator() {}
+  
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new ReadAheadIterator(this, env);
+  }
+  
+  public Key getTopKey() {
+    return currentElement.getKey();
+  }
+  
+  public Value getTopValue() {
+    return currentElement.getValue();
+  }
+  
+  public boolean hasTop() {
+    if (currentElement == noMoreDataElement)
+      return false;
+    return currentElement != null || queue.size() > 0 || source.hasTop();
+  }
+  
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    validateOptions(options);
+    this.source = source;
+    queue = new ArrayBlockingQueue<QueueElement>(queueSize);
+    thread = new ProducerThread(this.source);
+    t = new Thread(thread, "ReadAheadIterator-SourceThread");
+    t.start();
+  }
+  
+  /**
+   * Populate the key and value
+   */
+  public void next() throws IOException {
+    // Thread startup race condition, need to make sure that the
+    // thread has started before we call this the first time.
+    while (t.getState().equals(State.NEW)) {
+      try {
+        Thread.sleep(1);
+      } catch (InterruptedException e) {}
+    }
+    
+    if (t.getState().equals(State.TERMINATED)) {
+      // Thread encountered an error.
+      if (thread.hasError()) {
+        // and it should
+        throw new IOException("Background thread has died", thread.getError());
+      }
+    }
+    
+    // Pull an element off the queue, this will wait if there is no data yet.
+    try {
+      if (thread.hasError())
+        throw new IOException("background thread has error", thread.getError());
+      
+      QueueElement nextElement = null;
+      while (null == nextElement) {
+        try {
+          nextElement = queue.poll(1, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          // TODO: Do we need to do anything here?
+        }
+        if (null == nextElement) {
+          // Then we have no data and timed out, check for error condition in the read ahead thread
+          if (thread.hasError()) {
+            throw new IOException("background thread has error", thread.getError());
+          }
+        }
+      }
+      currentElement = nextElement;
+    } catch (IOException e) {
+      throw new IOException("Error getting element from source iterator", e);
+    }
+  }
+  
+  /**
+   * Seek to the next matching cell and call next to populate the key and value.
+   */
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    if (t.isAlive()) {
+      // Check for error
+      if (thread.hasError())
+        throw new IOException("background thread has error", thread.getError());
+      
+      try {
+        // Acquire the lock, or wait until its unlocked by the producer thread.
+        thread.lock();
+        queue.clear();
+        currentElement = null;
+        source.seek(range, columnFamilies, inclusive);
+      } finally {
+        thread.unlock();
+      }
+      next();
+    } else {
+      throw new IOException("source iterator thread has died.");
+    }
+  }
+  
+  public IteratorOptions describeOptions() {
+    Map<String,String> options = new HashMap<String,String>();
+    options.put(QUEUE_SIZE, "read ahead queue size");
+    options.put(TIMEOUT, "timeout in seconds before background thread thinks that the client has aborted");
+    return new IteratorOptions(getClass().getSimpleName(), "Iterator that puts the source in another thread", options, null);
+  }
+  
+  public boolean validateOptions(Map<String,String> options) {
+    if (options.containsKey(QUEUE_SIZE))
+      queueSize = Integer.parseInt(options.get(QUEUE_SIZE));
+    if (options.containsKey(TIMEOUT))
+      timeout = Integer.parseInt(options.get(TIMEOUT));
+    return true;
+  }
+  
 }