You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2011/12/05 21:05:51 UTC
svn commit: r1210600 [8/16] - in
/incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/
ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/
ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/
ingest/src/main/java/protobuf/ ...
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package iterator;
import java.io.IOException;
@@ -35,796 +35,792 @@ import org.apache.accumulo.core.iterator
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
/**
- * An iterator that handles "OR" query constructs on the server side.
- * This code has been adapted/merged from Heap and Multi Iterators.
+ * An iterator that handles "OR" query constructs on the server side. This code has been adapted/merged from Heap and Multi Iterators.
*/
-public class OrIterator implements SortedKeyValueIterator<Key, Value> {
-
- private TermSource currentTerm;
- private ArrayList<TermSource> sources;
- private PriorityQueue<TermSource> sorted = new PriorityQueue<TermSource>(5);
- private static final Text nullText = new Text();
- private Key topKey = null;
- private Range overallRange;
- private Collection<ByteSequence> columnFamilies;
- private boolean inclusive;
- protected static final Logger log = Logger.getLogger(OrIterator.class);
- private Text parentEndRow;
-
- protected static class TermSource implements Comparable<TermSource> {
-
- public SortedKeyValueIterator<Key, Value> iter;
- public Text dataLocation;
- public Text term;
- public Text docid;
- public Text fieldTerm;
- public Key topKey;
- public boolean atEnd;
-
- public TermSource(TermSource other) {
- this.iter = other.iter;
- this.term = other.term;
- this.dataLocation = other.dataLocation;
- this.atEnd = other.atEnd;
- }
-
- public TermSource(SortedKeyValueIterator<Key, Value> iter, Text term) {
- this.iter = iter;
- this.term = term;
- this.atEnd = false;
- }
-
- public TermSource(SortedKeyValueIterator<Key, Value> iter, Text dataLocation, Text term) {
- this.iter = iter;
- this.dataLocation = dataLocation;
- this.term = term;
- this.atEnd = false;
- }
-
- public void setNew() {
- if (!this.atEnd && this.iter.hasTop()) {
- this.topKey = this.iter.getTopKey();
-
- if (log.isDebugEnabled()) {
- log.debug("OI.TermSource.setNew TS.iter.topKey >>" + topKey + "<<");
- }
-
- if (this.term == null) {
- this.docid = this.topKey.getColumnQualifier();
- } else {
- String cqString = this.topKey.getColumnQualifier().toString();
-
- int idx = cqString.indexOf("\0");
- this.fieldTerm = new Text(cqString.substring(0, idx));
- this.docid = new Text(cqString.substring(idx + 1));
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("OI.TermSource.setNew Setting to null...");
- }
-
- //this.term = null;
- //this.dataLocation = null;
- this.topKey = null;
- this.fieldTerm = null;
- this.docid = null;
- }
- }
-
- public int compareTo(TermSource o) {
- // NOTE: If your implementation can have more than one row in a tablet,
- // you must compare row key here first, then column qualifier.
- // NOTE2: A null check is not needed because things are only added to the
- // sorted after they have been determined to be valid.
- //return this.docid.compareTo(o.docid);
- //return this.topKey.compareTo(o.topKey);
-
-
- // NOTE! We need to compare UID's, not Keys!
- Key k1 = topKey;
- Key k2 = o.topKey;
- //return t1.compareTo(t2);
- String uid1 = getUID(k1);
- String uid2 = getUID(k2);
-
- if (uid1 != null && uid2 != null) {
- return uid1.compareTo(uid2);
- } else if (uid1 == null && uid2 == null) {
- return 0;
- } else if (uid1 == null) {
- return 1;
- } else {
- return -1;
- }
-
- }
-
- @Override
- public String toString() {
- return "TermSource: " + this.dataLocation + " " + this.term;
- }
-
- public boolean hasTop() {
- return this.topKey != null;
- }
- }
-
- /**
- * Returns the given key's row
- * @param key
- * @return
- */
- protected Text getPartition(Key key) {
- return key.getRow();
- }
-
- /**
- * Returns the given key's dataLocation
- * @param key
- * @return
- */
- protected Text getDataLocation(Key key) {
- return key.getColumnFamily();
- }
-
- /**
- * Returns the given key's term
- * @param key
- * @return
- */
- protected Text getTerm(Key key) {
- int idx = 0;
- String sKey = key.getColumnQualifier().toString();
-
- idx = sKey.indexOf("\0");
- return new Text(sKey.substring(0, idx));
- }
-
- /**
- * Returns the given key's DocID
- * @param key
- * @return
- */
- protected Text getDocID(Key key) {
- int idx = 0;
- String sKey = key.getColumnQualifier().toString();
-
- idx = sKey.indexOf("\0");
- return new Text(sKey.substring(idx + 1));
- }
-
- /**
- * Returns the given key's UID
- * @param key
- * @return
- */
- static protected String getUID(Key key) {
- try {
- int idx = 0;
- String sKey = key.getColumnQualifier().toString();
-
- idx = sKey.lastIndexOf("\0");
- return sKey.substring(idx + 1);
- } catch (Exception e) {
- return null;
- }
- }
-
- public OrIterator() {
- this.sources = new ArrayList<TermSource>();
- }
-
- private OrIterator(OrIterator other, IteratorEnvironment env) {
- this.sources = new ArrayList<TermSource>();
-
- for (TermSource TS : other.sources) {
- this.sources.add(new TermSource(TS.iter.deepCopy(env), TS.dataLocation, TS.term));
- }
- }
-
- public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
- return new OrIterator(this, env);
- }
-
- public void addTerm(SortedKeyValueIterator<Key, Value> source, Text term, IteratorEnvironment env) {
- if (log.isDebugEnabled()) {
- log.debug("OI.addTerm Added source w/o family");
- log.debug("OI.addTerm term >>" + term + "<<");
- }
-
- // Don't deepcopy an iterator
- if (term == null) {
- this.sources.add(new TermSource(source, term));
+public class OrIterator implements SortedKeyValueIterator<Key,Value> {
+
+ private TermSource currentTerm;
+ private ArrayList<TermSource> sources;
+ private PriorityQueue<TermSource> sorted = new PriorityQueue<TermSource>(5);
+ private static final Text nullText = new Text();
+ private Key topKey = null;
+ private Range overallRange;
+ private Collection<ByteSequence> columnFamilies;
+ private boolean inclusive;
+ protected static final Logger log = Logger.getLogger(OrIterator.class);
+ private Text parentEndRow;
+
+ protected static class TermSource implements Comparable<TermSource> {
+
+ public SortedKeyValueIterator<Key,Value> iter;
+ public Text dataLocation;
+ public Text term;
+ public Text docid;
+ public Text fieldTerm;
+ public Key topKey;
+ public boolean atEnd;
+
+ public TermSource(TermSource other) {
+ this.iter = other.iter;
+ this.term = other.term;
+ this.dataLocation = other.dataLocation;
+ this.atEnd = other.atEnd;
+ }
+
+ public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
+ this.iter = iter;
+ this.term = term;
+ this.atEnd = false;
+ }
+
+ public TermSource(SortedKeyValueIterator<Key,Value> iter, Text dataLocation, Text term) {
+ this.iter = iter;
+ this.dataLocation = dataLocation;
+ this.term = term;
+ this.atEnd = false;
+ }
+
+ public void setNew() {
+ if (!this.atEnd && this.iter.hasTop()) {
+ this.topKey = this.iter.getTopKey();
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.TermSource.setNew TS.iter.topKey >>" + topKey + "<<");
+ }
+
+ if (this.term == null) {
+ this.docid = this.topKey.getColumnQualifier();
} else {
- this.sources.add(new TermSource(source.deepCopy(env), term));
- }
- }
-
- public void addTerm(SortedKeyValueIterator<Key, Value> source, Text dataLocation, Text term, IteratorEnvironment env) {
- if (log.isDebugEnabled()) {
- log.debug("OI.addTerm Added source ");
- log.debug("OI.addTerm family >>" + dataLocation + "<< term >>" + term + "<<");
- }
-
- // Don't deepcopy an iterator
- if (term == null) {
- this.sources.add(new TermSource(source, dataLocation, term));
+ String cqString = this.topKey.getColumnQualifier().toString();
+
+ int idx = cqString.indexOf("\0");
+ this.fieldTerm = new Text(cqString.substring(0, idx));
+ this.docid = new Text(cqString.substring(idx + 1));
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.TermSource.setNew Setting to null...");
+ }
+
+ // this.term = null;
+ // this.dataLocation = null;
+ this.topKey = null;
+ this.fieldTerm = null;
+ this.docid = null;
+ }
+ }
+
+ public int compareTo(TermSource o) {
+ // NOTE: If your implementation can have more than one row in a tablet,
+ // you must compare row key here first, then column qualifier.
+ // NOTE2: A null check is not needed because things are only added to the
+ // sorted after they have been determined to be valid.
+ // return this.docid.compareTo(o.docid);
+ // return this.topKey.compareTo(o.topKey);
+
+ // NOTE! We need to compare UID's, not Keys!
+ Key k1 = topKey;
+ Key k2 = o.topKey;
+ // return t1.compareTo(t2);
+ String uid1 = getUID(k1);
+ String uid2 = getUID(k2);
+
+ if (uid1 != null && uid2 != null) {
+ return uid1.compareTo(uid2);
+ } else if (uid1 == null && uid2 == null) {
+ return 0;
+ } else if (uid1 == null) {
+ return 1;
+ } else {
+ return -1;
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ return "TermSource: " + this.dataLocation + " " + this.term;
+ }
+
+ public boolean hasTop() {
+ return this.topKey != null;
+ }
+ }
+
+ /**
+ * Returns the given key's row
+ *
+ * @param key
+ * @return
+ */
+ protected Text getPartition(Key key) {
+ return key.getRow();
+ }
+
+ /**
+ * Returns the given key's dataLocation
+ *
+ * @param key
+ * @return
+ */
+ protected Text getDataLocation(Key key) {
+ return key.getColumnFamily();
+ }
+
+ /**
+ * Returns the given key's term
+ *
+ * @param key
+ * @return
+ */
+ protected Text getTerm(Key key) {
+ int idx = 0;
+ String sKey = key.getColumnQualifier().toString();
+
+ idx = sKey.indexOf("\0");
+ return new Text(sKey.substring(0, idx));
+ }
+
+ /**
+ * Returns the given key's DocID
+ *
+ * @param key
+ * @return
+ */
+ protected Text getDocID(Key key) {
+ int idx = 0;
+ String sKey = key.getColumnQualifier().toString();
+
+ idx = sKey.indexOf("\0");
+ return new Text(sKey.substring(idx + 1));
+ }
+
+ /**
+ * Returns the given key's UID
+ *
+ * @param key
+ * @return
+ */
+ static protected String getUID(Key key) {
+ try {
+ int idx = 0;
+ String sKey = key.getColumnQualifier().toString();
+
+ idx = sKey.lastIndexOf("\0");
+ return sKey.substring(idx + 1);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ public OrIterator() {
+ this.sources = new ArrayList<TermSource>();
+ }
+
+ private OrIterator(OrIterator other, IteratorEnvironment env) {
+ this.sources = new ArrayList<TermSource>();
+
+ for (TermSource TS : other.sources) {
+ this.sources.add(new TermSource(TS.iter.deepCopy(env), TS.dataLocation, TS.term));
+ }
+ }
+
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new OrIterator(this, env);
+ }
+
+ public void addTerm(SortedKeyValueIterator<Key,Value> source, Text term, IteratorEnvironment env) {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.addTerm Added source w/o family");
+ log.debug("OI.addTerm term >>" + term + "<<");
+ }
+
+ // Don't deepcopy an iterator
+ if (term == null) {
+ this.sources.add(new TermSource(source, term));
+ } else {
+ this.sources.add(new TermSource(source.deepCopy(env), term));
+ }
+ }
+
+ public void addTerm(SortedKeyValueIterator<Key,Value> source, Text dataLocation, Text term, IteratorEnvironment env) {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.addTerm Added source ");
+ log.debug("OI.addTerm family >>" + dataLocation + "<< term >>" + term + "<<");
+ }
+
+ // Don't deepcopy an iterator
+ if (term == null) {
+ this.sources.add(new TermSource(source, dataLocation, term));
+ } else {
+ this.sources.add(new TermSource(source.deepCopy(env), dataLocation, term));
+ }
+ }
+
+ /**
+ * Construct the topKey given the current <code>TermSource</code>
+ *
+ * @param TS
+ * @return
+ */
+ protected Key buildTopKey(TermSource TS) {
+ if ((TS == null) || (TS.topKey == null)) {
+ return null;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.buildTopKey New topKey >>" + new Key(TS.topKey.getRow(), TS.dataLocation, TS.docid) + "<<");
+ }
+
+ return new Key(TS.topKey.getRow(), TS.topKey.getColumnFamily(), TS.topKey.getColumnQualifier());
+ }
+
+ final public void next() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.next Enter: sorted.size = " + sorted.size() + " currentTerm = " + ((currentTerm == null) ? "null" : "not null"));
+ }
+
+ if (currentTerm == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.next currentTerm is NULL... returning");
+ }
+
+ topKey = null;
+ return;
+ }
+
+ // Advance currentTerm
+ currentTerm.iter.next();
+
+ advanceToMatch(currentTerm);
+
+ currentTerm.setNew();
+
+ // See if currentTerm is still valid, remove if not
+ if (log.isDebugEnabled()) {
+ log.debug("OI.next Checks (correct = 0,0,0): " + ((currentTerm.topKey != null) ? "0," : "1,") + ((currentTerm.dataLocation != null) ? "0," : "1,")
+ + ((currentTerm.term != null && currentTerm.fieldTerm != null) ? (currentTerm.term.compareTo(currentTerm.fieldTerm)) : "0"));
+ }
+
+ if (currentTerm.topKey == null || ((currentTerm.dataLocation != null) && (currentTerm.term.compareTo(currentTerm.fieldTerm) != 0))) {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.next removing entry:" + currentTerm.term);
+ }
+
+ currentTerm = null;
+ }
+
+ // optimization.
+ // if size == 0, currentTerm is the only item left,
+ // OR there are no items left.
+ // In either case, we don't need to use the PriorityQueue
+ if (sorted.size() > 0) {
+ // sort the term back in
+ if (currentTerm != null) {
+ sorted.add(currentTerm);
+ }
+ // and get the current top item out.
+ currentTerm = sorted.poll();
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.next CurrentTerm is " + ((currentTerm == null) ? "null" : currentTerm));
+ }
+
+ topKey = buildTopKey(currentTerm);
+
+ if (hasTop()) {
+ if (overallRange != null && !overallRange.contains(topKey)) {
+ topKey = null;
+ }
+ }
+ }
+
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+
+ overallRange = new Range(range);
+ if (log.isDebugEnabled()) {
+ log.debug("seek, overallRange: " + overallRange);
+ }
+
+ // if (range.getStartKey() != null && range.getStartKey().getRow() != null) {
+ // this.parentStartRow = range.getStartKey().getRow();
+ // }
+
+ if (range.getEndKey() != null && range.getEndKey().getRow() != null) {
+ this.parentEndRow = range.getEndKey().getRow();
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.seek Entry - sources.size = " + sources.size());
+ log.debug("OI.seek Entry - currentTerm = " + ((currentTerm == null) ? "false" : currentTerm.iter.getTopKey()));
+ log.debug("OI.seek Entry - Key from Range = " + ((range == null) ? "false" : range.getStartKey()));
+ }
+
+ // If sources.size is 0, there is nothing to process, so just return.
+ if (sources.isEmpty()) {
+ currentTerm = null;
+ topKey = null;
+ return;
+ }
+
+ this.columnFamilies = columnFamilies;
+ this.inclusive = inclusive;
+
+ Range newRange = range;
+ Key sourceKey = null;
+ Key startKey = null;
+
+ if (range != null) {
+ startKey = range.getStartKey();
+ }
+
+ // Clear the PriorityQueue so that we can re-populate it.
+ sorted.clear();
+
+ TermSource TS = null;
+ Iterator<TermSource> iter = sources.iterator();
+ // For each term, seek forward.
+ // if a hit is not found, delete it from future searches.
+ int counter = 1;
+ while (iter.hasNext()) {
+ TS = iter.next();
+
+ TS.atEnd = false;
+
+ if (sources.size() == 1) {
+ currentTerm = TS;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.seek on TS >>" + TS + "<<");
+ log.debug("OI.seek seeking source >>" + counter + "<< ");
+ }
+
+ counter++;
+
+ newRange = range;
+ sourceKey = null;
+
+ if (startKey != null) {
+ // Construct the new key for the range
+ if (log.isDebugEnabled()) {
+ log.debug("OI.seek startKey >>" + startKey + "<<");
+ }
+
+ if (startKey.getColumnQualifier() != null) {
+ sourceKey = new Key(startKey.getRow(), (TS.dataLocation == null) ? nullText : TS.dataLocation, new Text(((TS.term == null) ? "" : TS.term + "\0")
+ + range.getStartKey().getColumnQualifier()));
} else {
- this.sources.add(new TermSource(source.deepCopy(env), dataLocation, term));
+ sourceKey = new Key(startKey.getRow(), (TS.dataLocation == null) ? nullText : TS.dataLocation, (TS.term == null) ? nullText : TS.term);
}
- }
-
- /**
- * Construct the topKey given the current <code>TermSource</code>
- * @param TS
- * @return
- */
- protected Key buildTopKey(TermSource TS) {
- if ((TS == null) || (TS.topKey == null)) {
- return null;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("OI.buildTopKey New topKey >>" + new Key(TS.topKey.getRow(), TS.dataLocation, TS.docid) + "<<");
- }
-
- return new Key(TS.topKey.getRow(), TS.topKey.getColumnFamily(), TS.topKey.getColumnQualifier());
- }
-
- final public void next() throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("OI.next Enter: sorted.size = " + sorted.size() + " currentTerm = "
- + ((currentTerm == null) ? "null" : "not null"));
- }
-
- if (currentTerm == null) {
- if (log.isDebugEnabled()) {
- log.debug("OI.next currentTerm is NULL... returning");
- }
-
- topKey = null;
- return;
- }
-
- // Advance currentTerm
- currentTerm.iter.next();
-
- advanceToMatch(currentTerm);
-
- currentTerm.setNew();
-
- // See if currentTerm is still valid, remove if not
+
if (log.isDebugEnabled()) {
- log.debug("OI.next Checks (correct = 0,0,0): "
- + ((currentTerm.topKey != null) ? "0," : "1,")
- + ((currentTerm.dataLocation != null) ? "0," : "1,")
- + ((currentTerm.term != null && currentTerm.fieldTerm != null) ? (currentTerm.term.compareTo(currentTerm.fieldTerm)) : "0"));
- }
-
- if (currentTerm.topKey == null || ((currentTerm.dataLocation != null) && (currentTerm.term.compareTo(currentTerm.fieldTerm) != 0))) {
- if (log.isDebugEnabled()) {
- log.debug("OI.next removing entry:" + currentTerm.term);
- }
-
- currentTerm = null;
- }
-
-
- // optimization.
- // if size == 0, currentTerm is the only item left,
- // OR there are no items left.
- // In either case, we don't need to use the PriorityQueue
- if (sorted.size() > 0) {
- // sort the term back in
- if (currentTerm != null) {
- sorted.add(currentTerm);
- }
- // and get the current top item out.
- currentTerm = sorted.poll();
+ log.debug("OI.seek Seeking to the key => " + sourceKey);
}
-
+
+ newRange = new Range(sourceKey, true, sourceKey.followingKey(PartialKey.ROW), false);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.seek Using the range Seek() argument to seek => " + newRange);
+ }
+ }
+
+ TS.iter.seek(newRange, columnFamilies, inclusive);
+
+ TS.setNew();
+
+ // Make sure we're on a key with the correct dataLocation and term
+ advanceToMatch(TS);
+
+ TS.setNew();
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.seek sourceKey >>" + sourceKey + "<< ");
+ log.debug("OI.seek topKey >>" + ((TS.topKey == null) ? "false" : TS.topKey) + "<< ");
+ log.debug("OI.seek TS.fieldTerm == " + TS.fieldTerm);
+
+ log.debug("OI.seek Checks (correct = 0,0,0 / 0,1,1): " + ((TS.topKey != null) ? "0," : "1,") + ((TS.dataLocation != null) ? "0," : "1,")
+ + (((TS.term != null && TS.fieldTerm != null) && (TS.term.compareTo(TS.fieldTerm) != 0)) ? "0" : "1"));
+ }
+
+ if ((TS.topKey == null) || ((TS.dataLocation != null) && (TS.term.compareTo(TS.fieldTerm) != 0))) {
+ // log.debug("OI.seek Removing " + TS.term);
+ // iter.remove();
+ } // Optimization if we only have one element
+ else if (sources.size() > 0 || iter.hasNext()) {
+ // We have more than one source to search for, use the priority queue
+ sorted.add(TS);
+ } else {
+ // Don't need to continue, only had one item to search
if (log.isDebugEnabled()) {
- log.debug("OI.next CurrentTerm is " + ((currentTerm == null) ? "null" : currentTerm));
+ log.debug("OI.seek new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
}
-
- topKey = buildTopKey(currentTerm);
-
+
+ // make sure it is in the range if we have one.
if (hasTop()) {
- if (overallRange != null && !overallRange.contains(topKey)) {
- topKey = null;
- }
- }
- }
-
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-
- overallRange = new Range(range);
- if(log.isDebugEnabled()){
- log.debug("seek, overallRange: "+overallRange);
- }
-
-// if (range.getStartKey() != null && range.getStartKey().getRow() != null) {
-// this.parentStartRow = range.getStartKey().getRow();
-// }
-
- if (range.getEndKey() != null && range.getEndKey().getRow() != null) {
- this.parentEndRow = range.getEndKey().getRow();
- }
-
- if (log.isDebugEnabled()) {
- log.debug("OI.seek Entry - sources.size = " + sources.size());
- log.debug("OI.seek Entry - currentTerm = " + ((currentTerm == null) ? "false" : currentTerm.iter.getTopKey()));
- log.debug("OI.seek Entry - Key from Range = " + ((range == null) ? "false" : range.getStartKey()));
- }
-
- // If sources.size is 0, there is nothing to process, so just return.
- if (sources.isEmpty()) {
- currentTerm = null;
- topKey = null;
- return;
- }
-
- this.columnFamilies = columnFamilies;
- this.inclusive = inclusive;
-
-
- Range newRange = range;
- Key sourceKey = null;
- Key startKey = null;
-
- if (range != null) {
- startKey = range.getStartKey();
- }
-
- // Clear the PriorityQueue so that we can re-populate it.
- sorted.clear();
-
- TermSource TS = null;
- Iterator<TermSource> iter = sources.iterator();
- // For each term, seek forward.
- // if a hit is not found, delete it from future searches.
- int counter = 1;
- while (iter.hasNext()) {
- TS = iter.next();
-
- TS.atEnd = false;
-
- if (sources.size() == 1) {
- currentTerm = TS;
- }
-
+ if (overallRange != null && !overallRange.contains(topKey)) {
if (log.isDebugEnabled()) {
- log.debug("OI.seek on TS >>" + TS + "<<");
- log.debug("OI.seek seeking source >>" + counter + "<< ");
+ log.debug("seek, topKey: " + topKey + " is not in the overallRange: " + overallRange);
}
-
- counter++;
-
- newRange = range;
- sourceKey = null;
-
- if (startKey != null) {
- // Construct the new key for the range
- if (log.isDebugEnabled()) {
- log.debug("OI.seek startKey >>" + startKey + "<<");
- }
-
- if (startKey.getColumnQualifier() != null) {
- sourceKey = new Key(startKey.getRow(),
- (TS.dataLocation == null) ? nullText : TS.dataLocation,
- new Text(((TS.term == null) ? "" : TS.term + "\0") + range.getStartKey().getColumnQualifier()));
- } else {
- sourceKey = new Key(startKey.getRow(),
- (TS.dataLocation == null) ? nullText : TS.dataLocation,
- (TS.term == null) ? nullText : TS.term);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("OI.seek Seeking to the key => " + sourceKey);
- }
-
- newRange = new Range(sourceKey, true, sourceKey.followingKey(PartialKey.ROW), false);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("OI.seek Using the range Seek() argument to seek => " + newRange);
- }
- }
-
- TS.iter.seek(newRange, columnFamilies, inclusive);
-
- TS.setNew();
-
- // Make sure we're on a key with the correct dataLocation and term
- advanceToMatch(TS);
-
- TS.setNew();
-
- if (log.isDebugEnabled()) {
- log.debug("OI.seek sourceKey >>" + sourceKey + "<< ");
- log.debug("OI.seek topKey >>" + ((TS.topKey == null) ? "false" : TS.topKey) + "<< ");
- log.debug("OI.seek TS.fieldTerm == " + TS.fieldTerm);
-
- log.debug("OI.seek Checks (correct = 0,0,0 / 0,1,1): "
- + ((TS.topKey != null) ? "0," : "1,")
- + ((TS.dataLocation != null) ? "0," : "1,")
- + (((TS.term != null && TS.fieldTerm != null) && (TS.term.compareTo(TS.fieldTerm) != 0)) ? "0" : "1"));
- }
-
- if ((TS.topKey == null) || ((TS.dataLocation != null) && (TS.term.compareTo(TS.fieldTerm) != 0))) {
- //log.debug("OI.seek Removing " + TS.term);
- //iter.remove();
- } // Optimization if we only have one element
- else if (sources.size() > 0 || iter.hasNext()) {
- // We have more than one source to search for, use the priority queue
- sorted.add(TS);
- } else {
- // Don't need to continue, only had one item to search
- if (log.isDebugEnabled()) {
- log.debug("OI.seek new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
- }
-
- // make sure it is in the range if we have one.
- if (hasTop()) {
- if (overallRange != null && !overallRange.contains(topKey)) {
- if (log.isDebugEnabled()) {
- log.debug("seek, topKey: " + topKey + " is not in the overallRange: " + overallRange);
- }
- topKey = null;
- }
- }
- return;
- }
- }
-
- // And set currentTerm = the next valid key/term.
- currentTerm = sorted.poll();
-
- if (log.isDebugEnabled()) {
- log.debug("OI.seek currentTerm = " + currentTerm);
- }
-
- topKey = buildTopKey(currentTerm);
- if (topKey == null) {
- if (log.isDebugEnabled()) {
- log.debug("OI.seek() topKey is null");
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("OI.seek new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
- }
-
- if (hasTop()) {
- if (overallRange != null && !overallRange.contains(topKey)) {
- if (log.isDebugEnabled()) {
- log.debug("seek, topKey: " + topKey + " is not in the overallRange: " + overallRange);
- }
- topKey = null;
- }
- }
-
- }
-
- final public Key getTopKey() {
- if (log.isDebugEnabled()) {
- log.debug("OI.getTopKey key >>" + topKey);
- }
-
- return topKey;
- }
-
- final public Value getTopValue() {
- if (log.isDebugEnabled()) {
- log.debug("OI.getTopValue key >>" + currentTerm.iter.getTopValue());
- }
-
- return currentTerm.iter.getTopValue();
- }
-
- final public boolean hasTop() {
- if (log.isDebugEnabled()) {
- log.debug("OI.hasTop = " + ((topKey == null) ? "false" : "true"));
+ topKey = null;
+ }
}
-
- return topKey != null;
+ return;
+ }
}
-
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Ensures that the current <code>TermSource</code> is pointing to a key with the correct <code>dataLocation</code> and
- * <code>term</code> or sets <code>topKey</code> to null if there is no such key remaining.
- * @param TS The <code>TermSource</code> to advance
- * @throws IOException
- */
- private void advanceToMatch(TermSource TS) throws IOException {
- boolean matched = false;
- while (!matched) {
- if (!TS.iter.hasTop()) {
- TS.topKey = null;
- return;
- }
-
- Key iterTopKey = TS.iter.getTopKey();
-
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch current topKey = " + iterTopKey);
- }
-
- // we should compare the row to the end of the range
- if (overallRange.getEndKey() != null) {
-
- if (overallRange != null && !overallRange.contains(TS.iter.getTopKey())) {
- if (log.isDebugEnabled()) {
- log.debug("overallRange: " + overallRange + " does not contain TS.iter.topKey: " + TS.iter.getTopKey());
- log.debug("OI.advanceToMatch at the end, returning");
- }
-
- TS.atEnd = true;
- TS.topKey = null;
-
- return;
- } else {
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch not at the end");
- }
- }
+
+ // And set currentTerm = the next valid key/term.
+ currentTerm = sorted.poll();
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.seek currentTerm = " + currentTerm);
+ }
+
+ topKey = buildTopKey(currentTerm);
+ if (topKey == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.seek() topKey is null");
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.seek new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
+ }
+
+ if (hasTop()) {
+ if (overallRange != null && !overallRange.contains(topKey)) {
+ if (log.isDebugEnabled()) {
+ log.debug("seek, topKey: " + topKey + " is not in the overallRange: " + overallRange);
+ }
+ topKey = null;
+ }
+ }
+
+ }
+
+ final public Key getTopKey() {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.getTopKey key >>" + topKey);
+ }
+
+ return topKey;
+ }
+
+ final public Value getTopValue() {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.getTopValue key >>" + currentTerm.iter.getTopValue());
+ }
+
+ return currentTerm.iter.getTopValue();
+ }
+
+ final public boolean hasTop() {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.hasTop = " + ((topKey == null) ? "false" : "true"));
+ }
+
+ return topKey != null;
+ }
+
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Ensures that the current <code>TermSource</code> is pointing to a key with the correct <code>dataLocation</code> and <code>term</code> or sets
+ * <code>topKey</code> to null if there is no such key remaining.
+ *
+ * @param TS
+ * The <code>TermSource</code> to advance
+ * @throws IOException
+ */
+ private void advanceToMatch(TermSource TS) throws IOException {
+ boolean matched = false;
+ while (!matched) {
+ if (!TS.iter.hasTop()) {
+ TS.topKey = null;
+ return;
+ }
+
+ Key iterTopKey = TS.iter.getTopKey();
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch current topKey = " + iterTopKey);
+ }
+
+ // we should compare the row to the end of the range
+ if (overallRange.getEndKey() != null) {
+
+ if (overallRange != null && !overallRange.contains(TS.iter.getTopKey())) {
+ if (log.isDebugEnabled()) {
+ log.debug("overallRange: " + overallRange + " does not contain TS.iter.topKey: " + TS.iter.getTopKey());
+ log.debug("OI.advanceToMatch at the end, returning");
+ }
+
+ TS.atEnd = true;
+ TS.topKey = null;
+
+ return;
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch not at the end");
+ }
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch overallRange.getEndKey() == null");
+ }
+ }
+
+ // Advance to the correct dataLocation
+ if (log.isDebugEnabled()) {
+ log.debug("Comparing dataLocations.");
+ log.debug("OI.advanceToMatch dataLocationCompare: " + getDataLocation(iterTopKey) + " == " + TS.dataLocation);
+ }
+
+ int dataLocationCompare = getDataLocation(iterTopKey).compareTo(TS.dataLocation);
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch dataLocationCompare = " + dataLocationCompare);
+ }
+
+ // Make sure we're at a row for this dataLocation
+ if (dataLocationCompare < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch seek to desired dataLocation");
+ }
+
+ Key seekKey = new Key(iterTopKey.getRow(), TS.dataLocation, nullText);
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch seeking to => " + seekKey);
+ }
+
+ TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
+
+ continue;
+ } else if (dataLocationCompare > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch advanced beyond desired dataLocation, seek to next row");
+ }
+
+ // Gone past the current dataLocation, seek to the next row
+ Key seekKey = iterTopKey.followingKey(PartialKey.ROW);
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch seeking to => " + seekKey);
+ }
+
+ TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
+
+ continue;
+ }
+
+ // Advance to the correct term
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch termCompare: " + getTerm(iterTopKey) + " == " + TS.term);
+ }
+
+ int termCompare = getTerm(iterTopKey).compareTo(TS.term);
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch termCompare = " + termCompare);
+ }
+
+ // Make sure we're at a row for this term
+ if (termCompare < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch seek to desired term");
+ }
+
+ Key seekKey = new Key(iterTopKey.getRow(), iterTopKey.getColumnFamily(), TS.term);
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch seeking to => " + seekKey);
+ }
+
+ TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
+
+ continue;
+ } else if (termCompare > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch advanced beyond desired term, seek to next row");
+ }
+
+ // Gone past the current term, seek to the next row
+ Key seekKey = iterTopKey.followingKey(PartialKey.ROW);
+
+ if (log.isDebugEnabled()) {
+ log.debug("OI.advanceToMatch seeking to => " + seekKey);
+ }
+
+ TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
+ continue;
+ }
+
+ // If we made it here, we found a match
+ matched = true;
+ }
+ }
+
+ public boolean jump(Key jumpKey) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("OR jump: " + jumpKey);
+ printTopKeysForTermSources();
+ }
+
+ // is the jumpKey outside my overall range?
+ if (parentEndRow != null && parentEndRow.compareTo(jumpKey.getRow()) < 0) {
+ // can't go there.
+ if (log.isDebugEnabled()) {
+ log.debug("jumpRow: " + jumpKey.getRow() + " is greater than my parentEndRow: " + parentEndRow);
+ }
+ return false;
+ }
+
+ // Clear the PriorityQueue so that we can re-populate it.
+ sorted.clear();
+
+ // check each term source and jump it if necessary.
+ for (TermSource ts : sources) {
+ int comp;
+ if (!ts.hasTop()) {
+ if (log.isDebugEnabled()) {
+ log.debug("jump called, but ts.topKey is null, this one needs to move to next row.");
+ }
+ Key endKey = null;
+ if (parentEndRow != null) {
+ endKey = new Key(parentEndRow);
+ }
+ Range newRange = new Range(jumpKey, true, endKey, false);
+ ts.iter.seek(newRange, columnFamilies, inclusive);
+ ts.setNew();
+ advanceToMatch(ts);
+ ts.setNew();
+
+ } else {
+ // check row, then uid
+ comp = this.topKey.getRow().compareTo(jumpKey.getRow());
+ if (comp > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("jump, our row is ahead of jumpKey.");
+ log.debug("jumpRow: " + jumpKey.getRow() + " myRow: " + topKey.getRow() + " parentEndRow" + parentEndRow);
+ }
+ if (ts.hasTop()) {
+ sorted.add(ts);
+ }
+ // do nothing, we're ahead of jumpKey row and have topkey
+ } else if (comp < 0) { // a row behind jump key, need to move forward
+ if (log.isDebugEnabled()) {
+ log.debug("OR jump, row jump");
+ }
+ Key endKey = null;
+ if (parentEndRow != null) {
+ endKey = new Key(parentEndRow);
+ }
+ Key sKey = new Key(jumpKey.getRow());
+ Range fake = new Range(sKey, true, endKey, false);
+ ts.iter.seek(fake, columnFamilies, inclusive);
+ ts.setNew();
+ advanceToMatch(ts);
+ ts.setNew();
+ } else {
+ // need to check uid
+ String myUid = getUID(ts.topKey);
+ String jumpUid = getUID(jumpKey);
+
+ if (log.isDebugEnabled()) {
+ if (myUid == null) {
+ log.debug("myUid is null");
} else {
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch overallRange.getEndKey() == null");
- }
- }
-
- // Advance to the correct dataLocation
- if (log.isDebugEnabled()) {
- log.debug("Comparing dataLocations.");
- log.debug("OI.advanceToMatch dataLocationCompare: " + getDataLocation(iterTopKey) + " == " + TS.dataLocation);
- }
-
- int dataLocationCompare = getDataLocation(iterTopKey).compareTo(TS.dataLocation);
-
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch dataLocationCompare = " + dataLocationCompare);
- }
-
- // Make sure we're at a row for this dataLocation
- if (dataLocationCompare < 0) {
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch seek to desired dataLocation");
- }
-
- Key seekKey = new Key(iterTopKey.getRow(), TS.dataLocation, nullText);
-
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch seeking to => " + seekKey);
- }
-
- TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
-
- continue;
- } else if (dataLocationCompare > 0) {
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch advanced beyond desired dataLocation, seek to next row");
- }
-
- // Gone past the current dataLocation, seek to the next row
- Key seekKey = iterTopKey.followingKey(PartialKey.ROW);
-
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch seeking to => " + seekKey);
- }
-
- TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
-
- continue;
- }
-
- // Advance to the correct term
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch termCompare: " + getTerm(iterTopKey) + " == " + TS.term);
- }
-
- int termCompare = getTerm(iterTopKey).compareTo(TS.term);
-
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch termCompare = " + termCompare);
- }
-
- // Make sure we're at a row for this term
- if (termCompare < 0) {
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch seek to desired term");
- }
-
- Key seekKey = new Key(iterTopKey.getRow(), iterTopKey.getColumnFamily(), TS.term);
-
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch seeking to => " + seekKey);
- }
-
- TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
-
- continue;
- } else if (termCompare > 0) {
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch advanced beyond desired term, seek to next row");
- }
-
- // Gone past the current term, seek to the next row
- Key seekKey = iterTopKey.followingKey(PartialKey.ROW);
-
- if (log.isDebugEnabled()) {
- log.debug("OI.advanceToMatch seeking to => " + seekKey);
- }
-
- TS.iter.seek(new Range(seekKey, true, null, false), columnFamilies, inclusive);
- continue;
+ log.debug("myUid: " + myUid);
}
-
- // If we made it here, we found a match
- matched = true;
- }
- }
-
- public boolean jump(Key jumpKey) throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("OR jump: " + jumpKey);
- printTopKeysForTermSources();
- }
-
- // is the jumpKey outside my overall range?
- if (parentEndRow != null && parentEndRow.compareTo(jumpKey.getRow()) < 0) {
- //can't go there.
- if (log.isDebugEnabled()) {
- log.debug("jumpRow: " + jumpKey.getRow() + " is greater than my parentEndRow: " + parentEndRow);
- }
- return false;
- }
-
- // Clear the PriorityQueue so that we can re-populate it.
- sorted.clear();
-
- // check each term source and jump it if necessary.
- for (TermSource ts : sources) {
- int comp;
- if (!ts.hasTop()) {
- if (log.isDebugEnabled()) {
- log.debug("jump called, but ts.topKey is null, this one needs to move to next row.");
- }
- Key endKey = null;
- if (parentEndRow != null) {
- endKey = new Key(parentEndRow);
- }
- Range newRange = new Range(jumpKey, true, endKey, false);
- ts.iter.seek(newRange, columnFamilies, inclusive);
- ts.setNew();
- advanceToMatch(ts);
- ts.setNew();
-
+
+ if (jumpUid == null) {
+ log.debug("jumpUid is null");
} else {
- // check row, then uid
- comp = this.topKey.getRow().compareTo(jumpKey.getRow());
- if (comp > 0) {
- if (log.isDebugEnabled()) {
- log.debug("jump, our row is ahead of jumpKey.");
- log.debug("jumpRow: " + jumpKey.getRow() + " myRow: " + topKey.getRow() + " parentEndRow" + parentEndRow);
- }
- if (ts.hasTop()) {
- sorted.add(ts);
- }
- // do nothing, we're ahead of jumpKey row and have topkey
- } else if (comp < 0) { //a row behind jump key, need to move forward
- if (log.isDebugEnabled()) {
- log.debug("OR jump, row jump");
- }
- Key endKey = null;
- if (parentEndRow != null) {
- endKey = new Key(parentEndRow);
- }
- Key sKey = new Key(jumpKey.getRow());
- Range fake = new Range(sKey, true, endKey, false);
- ts.iter.seek(fake, columnFamilies, inclusive);
- ts.setNew();
- advanceToMatch(ts);
- ts.setNew();
- } else {
- //need to check uid
- String myUid = getUID(ts.topKey);
- String jumpUid = getUID(jumpKey);
-
- if (log.isDebugEnabled()) {
- if (myUid == null) {
- log.debug("myUid is null");
- } else {
- log.debug("myUid: " + myUid);
- }
-
- if (jumpUid == null) {
- log.debug("jumpUid is null");
- } else {
- log.debug("jumpUid: " + jumpUid);
- }
- }
-
- int ucomp = myUid.compareTo(jumpUid);
- if (ucomp < 0) {
- // need to move forward
- // create range and seek it.
- Text row = ts.topKey.getRow();
- Text cf = ts.topKey.getColumnFamily();
- String cq = ts.topKey.getColumnQualifier().toString().replaceAll(myUid, jumpUid);
- Text cq_text = new Text(cq);
- Key sKey = new Key(row, cf, cq_text);
- Key eKey = null;
- if (parentEndRow != null) {
- eKey = new Key(parentEndRow);
- }
- Range fake = new Range(sKey, true, eKey, false);
- if (log.isDebugEnabled()) {
- log.debug("uid jump, new ts.iter.seek range: " + fake);
- }
- ts.iter.seek(fake, columnFamilies, inclusive);
- ts.setNew();
- advanceToMatch(ts);
- ts.setNew();
-
- if (log.isDebugEnabled()) {
- if (ts.iter.hasTop()) {
- log.debug("ts.iter.topkey: " + ts.iter.getTopKey());
- } else {
- log.debug("ts.iter.topKey is null");
- }
- }
- }//else do nothing, we're ahead of jump key
- }
+ log.debug("jumpUid: " + jumpUid);
}
-
- // ts should have moved, validate this particular ts.
- if (ts.hasTop()) {
- if (overallRange != null) {
- if (overallRange.contains(topKey)) {
- //if (topKey.getRow().compareTo(parentEndRow) < 0) {
- sorted.add(ts);
- }
- } else {
- sorted.add(ts);
- }
- }
- }
- // now get the top key from all TermSources.
- currentTerm = sorted.poll();
- if (log.isDebugEnabled()) {
- log.debug("OI.jump currentTerm = " + currentTerm);
- }
-
- topKey = buildTopKey(currentTerm);
- if (log.isDebugEnabled()) {
- log.debug("OI.jump new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
+ }
+
+ int ucomp = myUid.compareTo(jumpUid);
+ if (ucomp < 0) {
+ // need to move forward
+ // create range and seek it.
+ Text row = ts.topKey.getRow();
+ Text cf = ts.topKey.getColumnFamily();
+ String cq = ts.topKey.getColumnQualifier().toString().replaceAll(myUid, jumpUid);
+ Text cq_text = new Text(cq);
+ Key sKey = new Key(row, cf, cq_text);
+ Key eKey = null;
+ if (parentEndRow != null) {
+ eKey = new Key(parentEndRow);
+ }
+ Range fake = new Range(sKey, true, eKey, false);
+ if (log.isDebugEnabled()) {
+ log.debug("uid jump, new ts.iter.seek range: " + fake);
+ }
+ ts.iter.seek(fake, columnFamilies, inclusive);
+ ts.setNew();
+ advanceToMatch(ts);
+ ts.setNew();
+
+ if (log.isDebugEnabled()) {
+ if (ts.iter.hasTop()) {
+ log.debug("ts.iter.topkey: " + ts.iter.getTopKey());
+ } else {
+ log.debug("ts.iter.topKey is null");
+ }
+ }
+ }// else do nothing, we're ahead of jump key
+ }
+ }
+
+ // ts should have moved, validate this particular ts.
+ if (ts.hasTop()) {
+ if (overallRange != null) {
+ if (overallRange.contains(topKey)) {
+ // if (topKey.getRow().compareTo(parentEndRow) < 0) {
+ sorted.add(ts);
+ }
+ } else {
+ sorted.add(ts);
}
- return hasTop();
+ }
}
-
- private void printTopKeysForTermSources() {
- if (log.isDebugEnabled()) {
- for (TermSource ts : sources) {
- if (ts != null) {
- if (ts.topKey == null) {
- log.debug(ts.toString() + " topKey is null");
- } else {
- log.debug(ts.toString() + " topKey: " + ts.topKey);
- }
- } else {
- log.debug("ts is null");
- }
- }
-
- if (topKey != null) {
- log.debug("OrIterator current topKey: " + topKey);
- } else {
- log.debug("OrIterator current topKey is null");
- }
+ // now get the top key from all TermSources.
+ currentTerm = sorted.poll();
+ if (log.isDebugEnabled()) {
+ log.debug("OI.jump currentTerm = " + currentTerm);
+ }
+
+ topKey = buildTopKey(currentTerm);
+ if (log.isDebugEnabled()) {
+ log.debug("OI.jump new topKey >>" + ((topKey == null) ? "false" : topKey) + "<< ");
+ }
+ return hasTop();
+ }
+
+ private void printTopKeysForTermSources() {
+ if (log.isDebugEnabled()) {
+ for (TermSource ts : sources) {
+ if (ts != null) {
+ if (ts.topKey == null) {
+ log.debug(ts.toString() + " topKey is null");
+ } else {
+ log.debug(ts.toString() + " topKey: " + ts.topKey);
+ }
+ } else {
+ log.debug("ts is null");
}
+ }
+
+ if (topKey != null) {
+ log.debug("OrIterator current topKey: " + topKey);
+ } else {
+ log.debug("OrIterator current topKey is null");
+ }
}
+ }
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package iterator;
import java.io.IOException;
@@ -37,265 +37,261 @@ import org.apache.accumulo.core.iterator
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
/**
- * This iterator takes the source iterator (the one below it in the iterator stack)
- * and puts it in a background thread. The background thread continues processing
- * and fills a queue with the Keys and Values from the source iterator. When seek()
- * is called on this iterator, it pauses the background thread, clears the queue,
- * calls seek() on the source iterator, then resumes the thread filling the queue.
- *
- * Users of this iterator can set the queue size, default is five elements. Users
- * must be aware of the potential for OutOfMemory errors when using this iterator
- * with large queue sizes or large objects. This iterator copies the Key and Value from
- * the source iterator and puts them into the queue.
+ * This iterator takes the source iterator (the one below it in the iterator stack) and puts it in a background thread. The background thread continues
+ * processing and fills a queue with the Keys and Values from the source iterator. When seek() is called on this iterator, it pauses the background thread,
+ * clears the queue, calls seek() on the source iterator, then resumes the thread filling the queue.
+ *
+ * Users of this iterator can set the queue size, default is five elements. Users must be aware of the potential for OutOfMemory errors when using this iterator
+ * with large queue sizes or large objects. This iterator copies the Key and Value from the source iterator and puts them into the queue.
*
- * This iterator introduces some parallelism into the server side iterator stack. One use
- * case for this would be when an iterator takes a relatively long time to process each
- * K,V pair and causes the iterators above it to wait. By putting the longer running iterator
- * in a background thread we should be able to achieve greater throughput.
+ * This iterator introduces some parallelism into the server side iterator stack. One use case for this would be when an iterator takes a relatively long time
+ * to process each K,V pair and causes the iterators above it to wait. By putting the longer running iterator in a background thread we should be able to
+ * achieve greater throughput.
*
* NOTE: Experimental!
- *
+ *
*/
-public class ReadAheadIterator implements SortedKeyValueIterator<Key, Value>, OptionDescriber {
-
- private static Logger log = Logger.getLogger(ReadAheadIterator.class);
-
- public static final String QUEUE_SIZE = "queue.size";
-
- public static final String TIMEOUT = "timeout";
-
- private static final QueueElement noMoreDataElement = new QueueElement();
-
- private int queueSize = 5;
-
- private int timeout = 60;
-
- /**
- *
- * Class to hold key and value from the producing thread.
- *
- */
- static class QueueElement {
- Key key = null;
- Value value = null;
- public QueueElement() {}
- public QueueElement(Key key, Value value) {
- super();
- this.key = new Key(key);
- this.value = new Value(value.get(),true);
- }
- public Key getKey() {
- return key;
- }
- public Value getValue() {
- return value;
- }
- }
-
- /**
- *
- * Thread that produces data from the source iterator and places the results
- * in a queue.
- *
- */
- class ProducerThread extends ReentrantLock implements Runnable {
-
- private static final long serialVersionUID = 1L;
-
- private Exception e = null;
-
- private int waitTime = timeout;
-
- private SortedKeyValueIterator<Key, Value> sourceIter = null;
-
- public ProducerThread(SortedKeyValueIterator<Key, Value> source) {
- this.sourceIter = source;
- }
-
- public void run() {
- boolean hasMoreData = true;
- //Keep this thread running while there is more data to read
- //and items left in the queue to be read off.
- while (hasMoreData || queue.size() > 0) {
- try {
- //Acquire the lock, this will wait if the lock is being
- //held by the ReadAheadIterator.seek() method.
- this.lock();
- //Check to see if there is more data from the iterator below.
- hasMoreData = sourceIter.hasTop();
- //Break out of the loop if no more data.
- if (!hasMoreData)
- continue;
- //Put the next K,V onto the queue.
- try {
- QueueElement e = new QueueElement(sourceIter.getTopKey(), sourceIter.getTopValue());
- boolean inserted = false;
- try {
- inserted = queue.offer(e, this.waitTime, TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- this.e = ie;
- break;
- }
- if (!inserted) {
- //Then we either got a timeout, set the error and break out of the loop
- this.e = new TimeoutException("Background thread has exceeded wait time of " + this.waitTime + " seconds, aborting...");
- break;
- }
- //Move the iterator to the next K,V for the next iteration of this loop
- sourceIter.next();
- } catch (Exception e) {
- this.e = e;
- log.error("Error calling next on source iterator", e);
- break;
- }
- } finally {
- this.unlock();
- }
- }
- //If we broke out of the loop because of an error, then don't put the marker on the queue, just to do end.
- if (!hasError()) {
- //Put the special end of data marker into the queue
- try {
- queue.put(noMoreDataElement);
- } catch (InterruptedException e) {
- this.e = e;
- log.error("Error putting End of Data marker onto queue");
- }
- }
- }
-
- public boolean hasError() {
- return (this.e != null);
- }
-
- public Exception getError() {
- return this.e;
- }
- }
-
- private SortedKeyValueIterator<Key, Value> source;
- private ArrayBlockingQueue<QueueElement> queue = null;
- private QueueElement currentElement = new QueueElement();
- private ProducerThread thread = null;
- private Thread t = null;
-
- protected ReadAheadIterator(ReadAheadIterator other, IteratorEnvironment env)
- {
- source = other.source.deepCopy(env);
- }
-
- public ReadAheadIterator(){}
-
- public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
- return new ReadAheadIterator(this, env);
- }
-
- public Key getTopKey() {
- return currentElement.getKey();
- }
-
- public Value getTopValue() {
- return currentElement.getValue();
- }
-
- public boolean hasTop() {
- if (currentElement == noMoreDataElement)
- return false;
- return currentElement != null || queue.size() > 0 || source.hasTop();
- }
-
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- validateOptions(options);
- this.source = source;
- queue = new ArrayBlockingQueue<QueueElement>(queueSize);
- thread = new ProducerThread(this.source);
- t = new Thread(thread, "ReadAheadIterator-SourceThread");
- t.start();
- }
-
- /**
- * Populate the key and value
- */
- public void next() throws IOException {
- //Thread startup race condition, need to make sure that the
- //thread has started before we call this the first time.
- while (t.getState().equals(State.NEW)) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {}
- }
-
- if (t.getState().equals(State.TERMINATED)) {
- //Thread encountered an error.
- if (thread.hasError()) {
- //and it should
- throw new IOException("Background thread has died", thread.getError());
- }
- }
-
- //Pull an element off the queue, this will wait if there is no data yet.
- try {
- if (thread.hasError())
- throw new IOException("background thread has error", thread.getError());
-
- QueueElement nextElement = null;
- while (null == nextElement) {
- try {
- nextElement = queue.poll(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- //TODO: Do we need to do anything here?
- }
- if (null == nextElement) {
- //Then we have no data and timed out, check for error condition in the read ahead thread
- if (thread.hasError()) {
- throw new IOException("background thread has error", thread.getError());
- }
- }
- }
- currentElement = nextElement;
- } catch (IOException e) {
- throw new IOException("Error getting element from source iterator", e);
- }
- }
-
- /**
- * Seek to the next matching cell and call next to populate the key and value.
- */
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- if (t.isAlive()) {
- //Check for error
- if (thread.hasError())
- throw new IOException("background thread has error", thread.getError());
-
- try {
- //Acquire the lock, or wait until its unlocked by the producer thread.
- thread.lock();
- queue.clear();
- currentElement = null;
- source.seek(range, columnFamilies, inclusive);
- } finally {
- thread.unlock();
- }
- next();
- } else {
- throw new IOException("source iterator thread has died.");
- }
- }
-
- public IteratorOptions describeOptions() {
- Map<String,String> options = new HashMap<String,String>();
- options.put(QUEUE_SIZE, "read ahead queue size");
- options.put(TIMEOUT, "timeout in seconds before background thread thinks that the client has aborted");
- return new IteratorOptions(getClass().getSimpleName(),"Iterator that puts the source in another thread",
- options, null);
- }
-
- public boolean validateOptions(Map<String, String> options) {
- if (options.containsKey(QUEUE_SIZE))
- queueSize = Integer.parseInt(options.get(QUEUE_SIZE));
- if (options.containsKey(TIMEOUT))
- timeout = Integer.parseInt(options.get(TIMEOUT));
- return true;
- }
-
+public class ReadAheadIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+
+ private static Logger log = Logger.getLogger(ReadAheadIterator.class);
+
+ public static final String QUEUE_SIZE = "queue.size";
+
+ public static final String TIMEOUT = "timeout";
+
+ private static final QueueElement noMoreDataElement = new QueueElement();
+
+ private int queueSize = 5;
+
+ private int timeout = 60;
+
+ /**
+ *
+ * Class to hold key and value from the producing thread.
+ *
+ */
+ static class QueueElement {
+ Key key = null;
+ Value value = null;
+
+ public QueueElement() {}
+
+ public QueueElement(Key key, Value value) {
+ super();
+ this.key = new Key(key);
+ this.value = new Value(value.get(), true);
+ }
+
+ public Key getKey() {
+ return key;
+ }
+
+ public Value getValue() {
+ return value;
+ }
+ }
+
+ /**
+ *
+ * Thread that produces data from the source iterator and places the results in a queue.
+ *
+ */
+ class ProducerThread extends ReentrantLock implements Runnable {
+
+ private static final long serialVersionUID = 1L;
+
+ private Exception e = null;
+
+ private int waitTime = timeout;
+
+ private SortedKeyValueIterator<Key,Value> sourceIter = null;
+
+ public ProducerThread(SortedKeyValueIterator<Key,Value> source) {
+ this.sourceIter = source;
+ }
+
+ public void run() {
+ boolean hasMoreData = true;
+ // Keep this thread running while there is more data to read
+ // and items left in the queue to be read off.
+ while (hasMoreData || queue.size() > 0) {
+ try {
+ // Acquire the lock, this will wait if the lock is being
+ // held by the ReadAheadIterator.seek() method.
+ this.lock();
+ // Check to see if there is more data from the iterator below.
+ hasMoreData = sourceIter.hasTop();
+ // Break out of the loop if no more data.
+ if (!hasMoreData)
+ continue;
+ // Put the next K,V onto the queue.
+ try {
+ QueueElement e = new QueueElement(sourceIter.getTopKey(), sourceIter.getTopValue());
+ boolean inserted = false;
+ try {
+ inserted = queue.offer(e, this.waitTime, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ this.e = ie;
+ break;
+ }
+ if (!inserted) {
+ // Then we either got a timeout, set the error and break out of the loop
+ this.e = new TimeoutException("Background thread has exceeded wait time of " + this.waitTime + " seconds, aborting...");
+ break;
+ }
+ // Move the iterator to the next K,V for the next iteration of this loop
+ sourceIter.next();
+ } catch (Exception e) {
+ this.e = e;
+ log.error("Error calling next on source iterator", e);
+ break;
+ }
+ } finally {
+ this.unlock();
+ }
+ }
+ // If we broke out of the loop because of an error, then don't put the marker on the queue, just to do end.
+ if (!hasError()) {
+ // Put the special end of data marker into the queue
+ try {
+ queue.put(noMoreDataElement);
+ } catch (InterruptedException e) {
+ this.e = e;
+ log.error("Error putting End of Data marker onto queue");
+ }
+ }
+ }
+
+ public boolean hasError() {
+ return (this.e != null);
+ }
+
+ public Exception getError() {
+ return this.e;
+ }
+ }
+
+ private SortedKeyValueIterator<Key,Value> source;
+ private ArrayBlockingQueue<QueueElement> queue = null;
+ private QueueElement currentElement = new QueueElement();
+ private ProducerThread thread = null;
+ private Thread t = null;
+
+ protected ReadAheadIterator(ReadAheadIterator other, IteratorEnvironment env) {
+ source = other.source.deepCopy(env);
+ }
+
+ public ReadAheadIterator() {}
+
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new ReadAheadIterator(this, env);
+ }
+
+ public Key getTopKey() {
+ return currentElement.getKey();
+ }
+
+ public Value getTopValue() {
+ return currentElement.getValue();
+ }
+
+ public boolean hasTop() {
+ if (currentElement == noMoreDataElement)
+ return false;
+ return currentElement != null || queue.size() > 0 || source.hasTop();
+ }
+
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ validateOptions(options);
+ this.source = source;
+ queue = new ArrayBlockingQueue<QueueElement>(queueSize);
+ thread = new ProducerThread(this.source);
+ t = new Thread(thread, "ReadAheadIterator-SourceThread");
+ t.start();
+ }
+
+ /**
+ * Populate the key and value
+ */
+ public void next() throws IOException {
+ // Thread startup race condition, need to make sure that the
+ // thread has started before we call this the first time.
+ while (t.getState().equals(State.NEW)) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {}
+ }
+
+ if (t.getState().equals(State.TERMINATED)) {
+ // Thread encountered an error.
+ if (thread.hasError()) {
+ // and it should
+ throw new IOException("Background thread has died", thread.getError());
+ }
+ }
+
+ // Pull an element off the queue, this will wait if there is no data yet.
+ try {
+ if (thread.hasError())
+ throw new IOException("background thread has error", thread.getError());
+
+ QueueElement nextElement = null;
+ while (null == nextElement) {
+ try {
+ nextElement = queue.poll(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // TODO: Do we need to do anything here?
+ }
+ if (null == nextElement) {
+ // Then we have no data and timed out, check for error condition in the read ahead thread
+ if (thread.hasError()) {
+ throw new IOException("background thread has error", thread.getError());
+ }
+ }
+ }
+ currentElement = nextElement;
+ } catch (IOException e) {
+ throw new IOException("Error getting element from source iterator", e);
+ }
+ }
+
+ /**
+ * Seek to the next matching cell and call next to populate the key and value.
+ */
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ if (t.isAlive()) {
+ // Check for error
+ if (thread.hasError())
+ throw new IOException("background thread has error", thread.getError());
+
+ try {
+ // Acquire the lock, or wait until its unlocked by the producer thread.
+ thread.lock();
+ queue.clear();
+ currentElement = null;
+ source.seek(range, columnFamilies, inclusive);
+ } finally {
+ thread.unlock();
+ }
+ next();
+ } else {
+ throw new IOException("source iterator thread has died.");
+ }
+ }
+
+ public IteratorOptions describeOptions() {
+ Map<String,String> options = new HashMap<String,String>();
+ options.put(QUEUE_SIZE, "read ahead queue size");
+ options.put(TIMEOUT, "timeout in seconds before background thread thinks that the client has aborted");
+ return new IteratorOptions(getClass().getSimpleName(), "Iterator that puts the source in another thread", options, null);
+ }
+
+ public boolean validateOptions(Map<String,String> options) {
+ if (options.containsKey(QUEUE_SIZE))
+ queueSize = Integer.parseInt(options.get(QUEUE_SIZE));
+ if (options.containsKey(TIMEOUT))
+ timeout = Integer.parseInt(options.get(TIMEOUT));
+ return true;
+ }
+
}