You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2011/12/05 21:05:51 UTC
svn commit: r1210600 [4/16] - in
/incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/
ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/
ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/
ingest/src/main/java/protobuf/ ...
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package iterator;
import java.io.IOException;
@@ -43,286 +43,280 @@ import com.esotericsoftware.kryo.Kryo;
/**
*
- * This iterator aggregates rows together using the specified key comparator. Subclasses will
- * provide their own implementation of fillMap which will fill the supplied EventFields object
- * with field names (key) and field values (value). After all fields have been put into the
- * aggregated object (by aggregating all columns with the same key), the EventFields object will be compared
- * against the supplied expression. If the expression returns true, then the return key and
- * return value can be retrieved via getTopKey() and getTopValue().
+ * This iterator aggregates rows together using the specified key comparator. Subclasses will provide their own implementation of fillMap which will fill the
+ * supplied EventFields object with field names (key) and field values (value). After all fields have been put into the aggregated object (by aggregating all
+ * columns with the same key), the EventFields object will be compared against the supplied expression. If the expression returns true, then the return key and
+ * return value can be retrieved via getTopKey() and getTopValue().
*
- * Optionally, the caller can set an expression (field operator value) that should not be evaluated against
- * the event. For example, if the query is "A == 'foo' and B == 'bar'", but for some reason B may not be in
- * the data, then setting the UNEVALUATED_EXPRESSIONS option to "B == 'bar'" will allow the events to be
- * evaluated against the remainder of the expression and still return as true.
+ * Optionally, the caller can set an expression (field operator value) that should not be evaluated against the event. For example, if the query is
+ * "A == 'foo' and B == 'bar'", but for some reason B may not be in the data, then setting the UNEVALUATED_EXPRESSIONS option to "B == 'bar'" will allow the
+ * events to be evaluated against the remainder of the expression and still return as true.
*
- * By default this iterator will return all Events in the shard. If the START_DATE and
- * END_DATE are specified, then this iterator will evaluate the timestamp of the key against
- * the start and end dates. If the event date is not within the range of start to end, then it is skipped.
+ * By default this iterator will return all Events in the shard. If the START_DATE and END_DATE are specified, then this iterator will evaluate the timestamp of
+ * the key against the start and end dates. If the event date is not within the range of start to end, then it is skipped.
*
- * This iterator will return up the stack an EventFields object serialized using Kryo in the cell Value.
+ * This iterator will return up the stack an EventFields object serialized using Kryo in the cell Value.
*
*/
-public abstract class AbstractEvaluatingIterator implements SortedKeyValueIterator<Key, Value>, OptionDescriber {
-
- private static Logger log = Logger.getLogger(AbstractEvaluatingIterator.class);
- protected static final byte[] NULL_BYTE = new byte[0];
- public static final String QUERY_OPTION = "expr";
- public static final String UNEVALUTED_EXPRESSIONS = "unevaluated.expressions";
-
- private PartialKey comparator = null;
- private SortedKeyValueIterator<Key, Value> iterator;
- private Key currentKey = new Key();
- private Key returnKey;
- private Value returnValue;
- private String expression;
- private QueryEvaluator evaluator;
- private EventFields event = null;
- private static Kryo kryo = new Kryo();
- private Range seekRange = null;
- private Set<String> skipExpressions = null;
-
- protected AbstractEvaluatingIterator(AbstractEvaluatingIterator other, IteratorEnvironment env) {
- iterator = other.iterator.deepCopy(env);
- event = other.event;
- }
-
- public AbstractEvaluatingIterator() {
- }
-
- /**
- * Implementations will return the PartialKey value to use for comparing keys for aggregating events
- *
- * @return the type of comparator to use
- */
- public abstract PartialKey getKeyComparator();
-
- /**
- * When the query expression evaluates to true against the event, the event fields
- * will be serialized into the Value and returned up the iterator stack. Implemenations
- * will need to provide a key to be used with the event.
- *
- * @param k
- * @return the key that should be returned with the map of values.
- */
- public abstract Key getReturnKey(Key k) throws Exception;
-
- /**
- * Implementations will need to fill the map with field visibilities, names, and
- * values. When all fields have been aggregated the event will be evaluated against
- * the query expression.
- *
- * @param event Multimap of event names and fields.
- * @param key current Key
- * @param value current Value
- */
- public abstract void fillMap(EventFields event, Key key, Value value) throws Exception;
-
- /**
- * Check to see if this key should be acted upon. Provides the ability to skip this key
- * and all of the following ones that match using the comparator.
- *
- * @param key
- * @return
- */
- public abstract boolean isKeyAccepted(Key key);
-
- /**
- * Reset state.
- */
- public void reset() {
- event.clear();
+public abstract class AbstractEvaluatingIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+
+ private static Logger log = Logger.getLogger(AbstractEvaluatingIterator.class);
+ protected static final byte[] NULL_BYTE = new byte[0];
+ public static final String QUERY_OPTION = "expr";
+ public static final String UNEVALUTED_EXPRESSIONS = "unevaluated.expressions";
+
+ private PartialKey comparator = null;
+ private SortedKeyValueIterator<Key,Value> iterator;
+ private Key currentKey = new Key();
+ private Key returnKey;
+ private Value returnValue;
+ private String expression;
+ private QueryEvaluator evaluator;
+ private EventFields event = null;
+ private static Kryo kryo = new Kryo();
+ private Range seekRange = null;
+ private Set<String> skipExpressions = null;
+
+ protected AbstractEvaluatingIterator(AbstractEvaluatingIterator other, IteratorEnvironment env) {
+ iterator = other.iterator.deepCopy(env);
+ event = other.event;
+ }
+
+ public AbstractEvaluatingIterator() {}
+
+ /**
+ * Implementations will return the PartialKey value to use for comparing keys for aggregating events
+ *
+ * @return the type of comparator to use
+ */
+ public abstract PartialKey getKeyComparator();
+
+ /**
+ * When the query expression evaluates to true against the event, the event fields will be serialized into the Value and returned up the iterator stack.
+ * Implemenations will need to provide a key to be used with the event.
+ *
+ * @param k
+ * @return the key that should be returned with the map of values.
+ */
+ public abstract Key getReturnKey(Key k) throws Exception;
+
+ /**
+ * Implementations will need to fill the map with field visibilities, names, and values. When all fields have been aggregated the event will be evaluated
+ * against the query expression.
+ *
+ * @param event
+ * Multimap of event names and fields.
+ * @param key
+ * current Key
+ * @param value
+ * current Value
+ */
+ public abstract void fillMap(EventFields event, Key key, Value value) throws Exception;
+
+ /**
+ * Check to see if this key should be acted upon. Provides the ability to skip this key and all of the following ones that match using the comparator.
+ *
+ * @param key
+ * @return
+ */
+ public abstract boolean isKeyAccepted(Key key);
+
+ /**
+ * Reset state.
+ */
+ public void reset() {
+ event.clear();
+ }
+
+ private void aggregateRowColumn(EventFields event) throws IOException {
+
+ currentKey.set(iterator.getTopKey());
+
+ try {
+ fillMap(event, iterator.getTopKey(), iterator.getTopValue());
+ iterator.next();
+
+ while (iterator.hasTop() && iterator.getTopKey().equals(currentKey, this.comparator)) {
+ fillMap(event, iterator.getTopKey(), iterator.getTopValue());
+ iterator.next();
+ }
+
+ // Get the return key
+ returnKey = getReturnKey(currentKey);
+ } catch (Exception e) {
+ throw new IOException("Error aggregating event", e);
}
-
- private void aggregateRowColumn(EventFields event) throws IOException {
-
- currentKey.set(iterator.getTopKey());
-
- try {
- fillMap(event, iterator.getTopKey(), iterator.getTopValue());
- iterator.next();
-
- while (iterator.hasTop() && iterator.getTopKey().equals(currentKey, this.comparator)) {
- fillMap(event, iterator.getTopKey(), iterator.getTopValue());
- iterator.next();
- }
-
- //Get the return key
- returnKey = getReturnKey(currentKey);
- } catch (Exception e) {
- throw new IOException("Error aggregating event", e);
+
+ }
+
+ private void findTop() throws IOException {
+ do {
+ reset();
+ // check if aggregation is needed
+ if (iterator.hasTop()) {
+ // Check to see if the current key is accepted. For example in the wiki
+ // table there are field index rows. We don't want to process those in
+ // some cases so return right away. Consume all of the non-accepted keys
+ while (iterator.hasTop() && !isKeyAccepted(iterator.getTopKey())) {
+ iterator.next();
}
-
- }
-
- private void findTop() throws IOException {
- do {
- reset();
- //check if aggregation is needed
- if (iterator.hasTop()) {
- //Check to see if the current key is accepted. For example in the wiki
- //table there are field index rows. We don't want to process those in
- //some cases so return right away. Consume all of the non-accepted keys
- while (iterator.hasTop() && !isKeyAccepted(iterator.getTopKey())) {
- iterator.next();
- }
-
- if (iterator.hasTop()) {
- aggregateRowColumn(event);
-
- //Evaluate the event against the expression
- if (event.size() > 0 && this.evaluator.evaluate(event)) {
- if (log.isDebugEnabled()) {
- log.debug("Event evaluated to true, key = " + returnKey);
- }
- //Create a byte array
- byte[] serializedMap = new byte[event.getByteSize() + (event.size() * 20)];
- //Wrap in ByteBuffer to work with Kryo
- ByteBuffer buf = ByteBuffer.wrap(serializedMap);
- //Serialize the EventFields object
- event.writeObjectData(kryo, buf);
- //Truncate array to the used size.
- returnValue = new Value(Arrays.copyOfRange(serializedMap, 0, buf.position()));
- } else {
- returnKey = null;
- returnValue = null;
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Iterator no longer has top.");
- }
- }
- } else {
- log.info("Iterator.hasTop() == false");
+
+ if (iterator.hasTop()) {
+ aggregateRowColumn(event);
+
+ // Evaluate the event against the expression
+ if (event.size() > 0 && this.evaluator.evaluate(event)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Event evaluated to true, key = " + returnKey);
}
- } while (returnValue == null && iterator.hasTop());
-
- //Sanity check. Make sure both returnValue and returnKey are null or both are not null
- if (!((returnKey == null && returnValue == null) || (returnKey != null && returnValue != null))) {
- log.warn("Key: " + ((returnKey == null) ? "null" : returnKey.toString()));
- log.warn("Value: " + ((returnValue == null) ? "null" : returnValue.toString()));
- throw new IOException("Return values are inconsistent");
- }
- }
-
- public Key getTopKey() {
- if (returnKey != null) {
- return returnKey;
- }
- return iterator.getTopKey();
- }
-
- public Value getTopValue() {
- if (returnValue != null) {
- return returnValue;
- }
- return iterator.getTopValue();
- }
-
- public boolean hasTop() {
- return returnKey != null || iterator.hasTop();
- }
-
- public void next() throws IOException {
- if (returnKey != null) {
+ // Create a byte array
+ byte[] serializedMap = new byte[event.getByteSize() + (event.size() * 20)];
+ // Wrap in ByteBuffer to work with Kryo
+ ByteBuffer buf = ByteBuffer.wrap(serializedMap);
+ // Serialize the EventFields object
+ event.writeObjectData(kryo, buf);
+ // Truncate array to the used size.
+ returnValue = new Value(Arrays.copyOfRange(serializedMap, 0, buf.position()));
+ } else {
returnKey = null;
returnValue = null;
- } else if (iterator.hasTop()) {
- iterator.next();
- }
-
- findTop();
- }
-
- /**
- * Copy of IteratorUtil.maximizeStartKeyTimeStamp due to IllegalAccessError
- * @param range
- * @return
- */
- static Range maximizeStartKeyTimeStamp(Range range) {
- Range seekRange = range;
-
- if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
- Key seekKey = new Key(seekRange.getStartKey());
- seekKey.setTimestamp(Long.MAX_VALUE);
- seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
- }
-
- return seekRange;
- }
-
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- //do not want to seek to the middle of a value that should be
- //aggregated...
-
- seekRange = maximizeStartKeyTimeStamp(range);
-
- iterator.seek(seekRange, columnFamilies, inclusive);
- findTop();
-
- if (range.getStartKey() != null) {
- while (hasTop()
- && getTopKey().equals(range.getStartKey(), this.comparator)
- && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
- //the value has a more recent time stamp, so
- //pass it up
- //log.debug("skipping "+getTopKey());
- next();
- }
-
- while (hasTop() && range.beforeStartKey(getTopKey())) {
- next();
- }
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Iterator no longer has top.");
+ }
}
-
+ } else {
+ log.info("Iterator.hasTop() == false");
+ }
+ } while (returnValue == null && iterator.hasTop());
+
+ // Sanity check. Make sure both returnValue and returnKey are null or both are not null
+ if (!((returnKey == null && returnValue == null) || (returnKey != null && returnValue != null))) {
+ log.warn("Key: " + ((returnKey == null) ? "null" : returnKey.toString()));
+ log.warn("Value: " + ((returnValue == null) ? "null" : returnValue.toString()));
+ throw new IOException("Return values are inconsistent");
+ }
+ }
+
+ public Key getTopKey() {
+ if (returnKey != null) {
+ return returnKey;
+ }
+ return iterator.getTopKey();
+ }
+
+ public Value getTopValue() {
+ if (returnValue != null) {
+ return returnValue;
+ }
+ return iterator.getTopValue();
+ }
+
+ public boolean hasTop() {
+ return returnKey != null || iterator.hasTop();
+ }
+
+ public void next() throws IOException {
+ if (returnKey != null) {
+ returnKey = null;
+ returnValue = null;
+ } else if (iterator.hasTop()) {
+ iterator.next();
}
-
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- validateOptions(options);
- event = new EventFields();
- this.comparator = getKeyComparator();
- this.iterator = source;
- try {
- //Replace any expressions that we should not evaluate.
- if (null != this.skipExpressions && this.skipExpressions.size() != 0) {
- for (String skip : this.skipExpressions) {
- //Expression should have form: field<sp>operator<sp>literal.
- //We are going to replace the expression with field == null.
- String field = skip.substring(0, skip.indexOf(" ") -1);
- this.expression = this.expression.replaceAll(skip, field+" == null");
- }
- }
- this.evaluator = new QueryEvaluator(this.expression);
- } catch (ParseException e) {
- throw new IllegalArgumentException("Failed to parse query", e);
- }
- EventFields.initializeKryo(kryo);
+
+ findTop();
+ }
+
+ /**
+ * Copy of IteratorUtil.maximizeStartKeyTimeStamp due to IllegalAccessError
+ *
+ * @param range
+ * @return
+ */
+ static Range maximizeStartKeyTimeStamp(Range range) {
+ Range seekRange = range;
+
+ if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
+ Key seekKey = new Key(seekRange.getStartKey());
+ seekKey.setTimestamp(Long.MAX_VALUE);
+ seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
}
-
- public IteratorOptions describeOptions() {
- Map<String,String> options = new HashMap<String,String>();
- options.put(QUERY_OPTION, "query expression");
- options.put(UNEVALUTED_EXPRESSIONS, "comma separated list of expressions to skip");
- return new IteratorOptions(getClass().getSimpleName(), "evaluates event objects against an expression", options, null);
+
+ return seekRange;
+ }
+
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ // do not want to seek to the middle of a value that should be
+ // aggregated...
+
+ seekRange = maximizeStartKeyTimeStamp(range);
+
+ iterator.seek(seekRange, columnFamilies, inclusive);
+ findTop();
+
+ if (range.getStartKey() != null) {
+ while (hasTop() && getTopKey().equals(range.getStartKey(), this.comparator) && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
+ // the value has a more recent time stamp, so
+ // pass it up
+ // log.debug("skipping "+getTopKey());
+ next();
+ }
+
+ while (hasTop() && range.beforeStartKey(getTopKey())) {
+ next();
+ }
}
-
- public boolean validateOptions(Map<String, String> options) {
- if (!options.containsKey(QUERY_OPTION))
- return false;
- else
- this.expression = options.get(QUERY_OPTION);
-
- if (options.containsKey(UNEVALUTED_EXPRESSIONS)) {
- String expressionList = options.get(UNEVALUTED_EXPRESSIONS);
- if (expressionList != null && !expressionList.trim().equals("")) {
- this.skipExpressions = new HashSet<String>();
- for (String e : expressionList.split(","))
- this.skipExpressions.add(e);
- }
+
+ }
+
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ validateOptions(options);
+ event = new EventFields();
+ this.comparator = getKeyComparator();
+ this.iterator = source;
+ try {
+ // Replace any expressions that we should not evaluate.
+ if (null != this.skipExpressions && this.skipExpressions.size() != 0) {
+ for (String skip : this.skipExpressions) {
+ // Expression should have form: field<sp>operator<sp>literal.
+ // We are going to replace the expression with field == null.
+ String field = skip.substring(0, skip.indexOf(" ") - 1);
+ this.expression = this.expression.replaceAll(skip, field + " == null");
}
- return true;
- }
-
- public String getQueryExpression() {
- return this.expression;
- }
+ }
+ this.evaluator = new QueryEvaluator(this.expression);
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Failed to parse query", e);
+ }
+ EventFields.initializeKryo(kryo);
+ }
+
+ public IteratorOptions describeOptions() {
+ Map<String,String> options = new HashMap<String,String>();
+ options.put(QUERY_OPTION, "query expression");
+ options.put(UNEVALUTED_EXPRESSIONS, "comma separated list of expressions to skip");
+ return new IteratorOptions(getClass().getSimpleName(), "evaluates event objects against an expression", options, null);
+ }
+
+ public boolean validateOptions(Map<String,String> options) {
+ if (!options.containsKey(QUERY_OPTION))
+ return false;
+ else
+ this.expression = options.get(QUERY_OPTION);
+
+ if (options.containsKey(UNEVALUTED_EXPRESSIONS)) {
+ String expressionList = options.get(UNEVALUTED_EXPRESSIONS);
+ if (expressionList != null && !expressionList.trim().equals("")) {
+ this.skipExpressions = new HashSet<String>();
+ for (String e : expressionList.split(","))
+ this.skipExpressions.add(e);
+ }
+ }
+ return true;
+ }
+
+ public String getQueryExpression() {
+ return this.expression;
+ }
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package iterator;
import java.io.IOException;
@@ -32,896 +32,909 @@ import org.apache.commons.codec.binary.B
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
-public class AndIterator implements SortedKeyValueIterator<Key, Value> {
-
- protected static final Logger log = Logger.getLogger(AndIterator.class);
- private TermSource[] sources;
- private int sourcesCount = 0;
- protected Text nullText = new Text();
- protected final byte[] emptyByteArray = new byte[0];
- private Key topKey = null;
- protected Value value = new Value(emptyByteArray);
- private Range overallRange;
- private Text currentRow = null;
- private Text currentTerm = new Text(emptyByteArray);
- private Text currentDocID = new Text(emptyByteArray);
- private Collection<ByteSequence> seekColumnFamilies;
- private boolean inclusive;
- private Text parentEndRow;
-
- /**
- * Used in representing a Term that is intersected on.
- */
- protected static class TermSource {
-
- public SortedKeyValueIterator<Key, Value> iter;
- public Text dataLocation;
- public Text term;
- public boolean notFlag;
-
- public TermSource(TermSource other) {
- this.iter = other.iter;
- this.dataLocation = other.dataLocation;
- this.term = other.term;
- this.notFlag = other.notFlag;
- }
-
- public TermSource(SortedKeyValueIterator<Key, Value> iter, Text dataLocation, Text term) {
- this.iter = iter;
- this.dataLocation = dataLocation;
- this.term = term;
- this.notFlag = false;
- }
-
- public TermSource(SortedKeyValueIterator<Key, Value> iter, Text dataLocation, Text term, boolean notFlag) {
- this.iter = iter;
- this.dataLocation = dataLocation;
- this.term = term;
- this.notFlag = notFlag;
- }
-
- public String getTermString() {
- return (this.term == null) ? new String("Iterator") : this.term.toString();
- }
- }
-
-
- /* | Row | Column Family | Column Qualifier | Value
- * | {RowID} | {dataLocation} | {term}\0{dataType}\0{UID} | Empty
- */
- protected Text getPartition(Key key) {
- return key.getRow();
- }
-
- /**
- * Returns the given key's dataLocation
- * @param key
- * @return
- */
- protected Text getDataLocation(Key key) {
- return key.getColumnFamily();
- }
-
- /**
- * Returns the given key's term
- * @param key
- * @return
- */
- protected Text getTerm(Key key) {
- int idx = 0;
- String sKey = key.getColumnQualifier().toString();
-
- idx = sKey.indexOf("\0");
- return new Text(sKey.substring(0, idx));
- }
-
- /**
- * Returns the given key's DocID
- * @param key
- * @return
- */
- protected Text getDocID(Key key) {
- int idx = 0;
- String sKey = key.getColumnQualifier().toString();
-
- idx = sKey.indexOf("\0");
- return new Text(sKey.substring(idx + 1));
- }
-
- /**
- * Returns the given key's UID
- * @param key
- * @return
- */
- protected String getUID(Key key) {
- int idx = 0;
- String sKey = key.getColumnQualifier().toString();
-
- idx = sKey.lastIndexOf("\0");
- return sKey.substring(idx + 1);
- }
-
- /**
- * Build a key from the given row and dataLocation
- * @param row The desired row
- * @param dataLocation The desired dataLocation
- * @return
- */
- protected Key buildKey(Text row, Text dataLocation) {
- return new Key(row, (dataLocation == null) ? nullText : dataLocation);
- }
-
- /**
- * Build a key from the given row, dataLocation, and term
- * @param row The desired row
- * @param dataLocation The desired dataLocation
- * @param term The desired term
- * @return
- */
- protected Key buildKey(Text row, Text dataLocation, Text term) {
- return new Key(row,
- (dataLocation == null) ? nullText : dataLocation,
- (term == null) ? nullText : term);
- }
-
- /**
- * Return the key that directly follows the given key
- * @param key The key who will be directly before the returned key
- * @return
- */
- protected Key buildFollowingPartitionKey(Key key) {
- return key.followingKey(PartialKey.ROW);
- }
-
- /**
- * Empty default constructor
- */
- public AndIterator() {
- }
-
- public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
- return new AndIterator(this, env);
- }
-
- public AndIterator(AndIterator other, IteratorEnvironment env) {
- if (other.sources != null) {
- sourcesCount = other.sourcesCount;
- sources = new TermSource[sourcesCount];
- for (int i = 0; i < sourcesCount; i++) {
- sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].dataLocation, other.sources[i].term);
- }
- }
- }
-
- public Key getTopKey() {
- return topKey;
- }
-
- public Value getTopValue() {
- return value;
- }
-
- public boolean hasTop() {
- return currentRow != null;
- }
-
- /**
- * Find the next key in the current TermSource that is at or beyond
- * the cursor (currentRow, currentTerm, currentDocID).
- * @param sourceID The index of the current source in <code>sources</code>
- * @return True if the source advanced beyond the cursor
- * @throws IOException
- */
- private boolean seekOneSource(TermSource ts) throws IOException {
- /* Within this loop progress must be made in one of the following forms:
- * - currentRow, currentTerm, or curretDocID must be increased
- * - the given source must advance its iterator
- * This loop will end when any of the following criteria are met
- * - the iterator for the given source is pointing to the key (currentRow, columnFamilies[sourceID], currentTerm, currentDocID)
- * - the given source is out of data and currentRow is set to null
- * - the given source has advanced beyond the endRow and currentRow is set to null
- */
-
- // precondition: currentRow is not null
- boolean advancedCursor = false;
-
- while (true) {
- if (ts.iter.hasTop() == false) {
- if (log.isDebugEnabled()) {
- log.debug("The current iterator no longer has a top");
- }
-
- // If we got to the end of an iterator, found a Match if it's a NOT
- if (ts.notFlag) {
- break;
- }
-
- currentRow = null;
- // setting currentRow to null counts as advancing the cursor
- return true;
- }
-
- // check if we're past the end key
- int endCompare = -1;
-
- if (log.isDebugEnabled()) {
- log.debug("Current topKey = " + ts.iter.getTopKey());
- }
-
- // we should compare the row to the end of the range
- if (overallRange.getEndKey() != null) {
- if (log.isDebugEnabled()) {
- log.debug("II.seekOneSource overallRange.getEndKey() != null");
- }
-
- endCompare = overallRange.getEndKey().getRow().compareTo(ts.iter.getTopKey().getRow());
-
- if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
- if (log.isDebugEnabled()) {
- log.debug("II.seekOneSource at the end of the tablet server");
- }
-
- currentRow = null;
-
- // setting currentRow to null counts as advancing the cursor
- return true;
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("II.seekOneSource overallRange.getEndKey() == null");
- }
- }
-
- // Compare the Row IDs
- int partitionCompare = currentRow.compareTo(getPartition(ts.iter.getTopKey()));
- if (log.isDebugEnabled()) {
- log.debug("Current partition: " + currentRow);
- }
-
- // check if this source is already at or beyond currentRow
- // if not, then seek to at least the current row
- if (partitionCompare > 0) {
- if (log.isDebugEnabled()) {
- log.debug("Need to seek to the current row");
-
- // seek to at least the currentRow
- log.debug("ts.dataLocation = " + ts.dataLocation.getBytes());
- log.debug("Term = " + new Text(ts.term + "\0" + currentDocID).getBytes());
- }
-
- Key seekKey = buildKey(currentRow, ts.dataLocation, nullText);//new Text(ts.term + "\0" + currentDocID));
-
- if (log.isDebugEnabled()) {
- log.debug("Seeking to: " + seekKey);
- }
-
- ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
- continue;
- }
-
- // check if this source has gone beyond currentRow
- // if so, advance currentRow
- if (partitionCompare < 0) {
- if (log.isDebugEnabled()) {
- log.debug("Went too far beyond the currentRow");
- }
-
- if (ts.notFlag) {
- break;
- }
-
- currentRow.set(getPartition(ts.iter.getTopKey()));
- currentDocID.set(emptyByteArray);
-
- advancedCursor = true;
- continue;
- }
-
- // we have verified that the current source is positioned in currentRow
- // now we must make sure we're in the right columnFamily in the current row
- if (ts.dataLocation != null) {
- int dataLocationCompare = ts.dataLocation.compareTo(getDataLocation(ts.iter.getTopKey()));
-
- if (log.isDebugEnabled()) {
- log.debug("Comparing dataLocations");
- log.debug("dataLocation = " + ts.dataLocation);
- log.debug("newDataLocation = " + getDataLocation(ts.iter.getTopKey()));
- }
-
- // check if this source is already on the right columnFamily
- // if not, then seek forwards to the right columnFamily
- if (dataLocationCompare > 0) {
- if (log.isDebugEnabled()) {
- log.debug("Need to seek to the right dataLocation");
- }
-
- Key seekKey = buildKey(currentRow, ts.dataLocation, nullText);//, new Text(ts.term + "\0" + currentDocID));
-
- if (log.isDebugEnabled()) {
- log.debug("Seeking to: " + seekKey);
- }
-
- ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
- if(!ts.iter.hasTop()){
- currentRow = null;
- return true;
- }
-
- continue;
- }
- // check if this source is beyond the right columnFamily
- // if so, then seek to the next row
- if (dataLocationCompare < 0) {
- if (log.isDebugEnabled()) {
- log.debug("Went too far beyond the dataLocation");
- }
-
- if (endCompare == 0) {
- // we're done
- currentRow = null;
-
- // setting currentRow to null counts as advancing the cursor
- return true;
- }
-
- // Seeking beyond the current dataLocation gives a valid negated result
- if (ts.notFlag) {
- break;
- }
-
- Key seekKey = buildFollowingPartitionKey(ts.iter.getTopKey());
-
- if (log.isDebugEnabled()) {
- log.debug("Seeking to: " + seekKey);
- }
-
- ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
- if(!ts.iter.hasTop()){
- currentRow = null;
- return true;
- }
- continue;
- }
- }
-
- // Compare the Terms
- int termCompare = ts.term.compareTo(getTerm(ts.iter.getTopKey()));
- if (log.isDebugEnabled()) {
- log.debug("term = " + ts.term);
- log.debug("newTerm = " + getTerm(ts.iter.getTopKey()));
- }
-
- // We need to seek down farther into the data
- if (termCompare > 0) {
- if (log.isDebugEnabled()) {
- log.debug("Need to seek to the right term");
- }
-
- Key seekKey = buildKey(currentRow, ts.dataLocation, new Text(ts.term + "\0"));//new Text(ts.term + "\0" + currentDocID));
-
- if (log.isDebugEnabled()) {
- log.debug("Seeking to: " + seekKey);
- }
-
- ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
- if (!ts.iter.hasTop()) {
- currentRow = null;
- return true;
- }
-
- //currentTerm = getTerm(ts.iter.getTopKey());
-
- if (log.isDebugEnabled()) {
- log.debug("topKey after seeking to correct term: " + ts.iter.getTopKey());
- }
-
- continue;
- }
-
- // We've jumped out of the current term, set the new term as currentTerm and start looking again
- if (termCompare < 0) {
- if (log.isDebugEnabled()) {
- log.debug("TERM: Need to jump to the next row");
- }
-
- if (endCompare == 0) {
- currentRow = null;
-
- return true;
- }
-
- if (ts.notFlag) {
- break;
- }
-
- Key seekKey = buildFollowingPartitionKey(ts.iter.getTopKey());
- if (log.isDebugEnabled()) {
- log.debug("Using this key to find the next key: " + ts.iter.getTopKey());
- log.debug("Seeking to: " + seekKey);
- }
-
- ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
-
- if (!ts.iter.hasTop()) {
- currentRow = null;
- return true;
- }
-
- currentTerm = getTerm(ts.iter.getTopKey());
-
- continue;
- }
-
- // Compare the DocIDs
- Text docid = getDocID(ts.iter.getTopKey());
- int docidCompare = currentDocID.compareTo(docid);
-
- if (log.isDebugEnabled()) {
- log.debug("Comparing DocIDs");
- log.debug("currentDocID = " + currentDocID);
- log.debug("docid = " + docid);
- }
-
- // The source isn't at the right DOC
- if (docidCompare > 0) {
- if (log.isDebugEnabled()) {
- log.debug("Need to seek to the correct docid");
- }
-
- // seek forwards
- Key seekKey = buildKey(currentRow, ts.dataLocation, new Text(ts.term + "\0" + currentDocID));
-
- if (log.isDebugEnabled()) {
- log.debug("Seeking to: " + seekKey);
- }
-
- ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
-
- continue;
- }
-
- // if this source has advanced beyond the current column qualifier then advance currentCQ and return true
- if (docidCompare < 0) {
- if (ts.notFlag) {
- break;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("We went too far, update the currentDocID to be the location of where were seek'ed to");
- }
-
- currentDocID.set(docid);
- advancedCursor = true;
- break;
- }
-
- // Set the term as currentTerm (in case we found this record on the first try)
- currentTerm = getTerm(ts.iter.getTopKey());
-
- if (log.isDebugEnabled()) {
- log.debug("currentTerm = " + currentTerm);
- }
-
- // If we're negated, next() the first TermSource since we guaranteed it was not a NOT term
- if (ts.notFlag) {
- sources[0].iter.next();
- advancedCursor = true;
- }
-
- // If we got here, we have a match
- break;
- }
-
- return advancedCursor;
- }
-
- public void next() throws IOException {
+public class AndIterator implements SortedKeyValueIterator<Key,Value> {
+
+ protected static final Logger log = Logger.getLogger(AndIterator.class);
+ private TermSource[] sources;
+ private int sourcesCount = 0;
+ protected Text nullText = new Text();
+ protected final byte[] emptyByteArray = new byte[0];
+ private Key topKey = null;
+ protected Value value = new Value(emptyByteArray);
+ private Range overallRange;
+ private Text currentRow = null;
+ private Text currentTerm = new Text(emptyByteArray);
+ private Text currentDocID = new Text(emptyByteArray);
+ private Collection<ByteSequence> seekColumnFamilies;
+ private boolean inclusive;
+ private Text parentEndRow;
+
+ /**
+ * Used in representing a Term that is intersected on.
+ */
+ protected static class TermSource {
+
+ public SortedKeyValueIterator<Key,Value> iter;
+ public Text dataLocation;
+ public Text term;
+ public boolean notFlag;
+
+ public TermSource(TermSource other) {
+ this.iter = other.iter;
+ this.dataLocation = other.dataLocation;
+ this.term = other.term;
+ this.notFlag = other.notFlag;
+ }
+
+ public TermSource(SortedKeyValueIterator<Key,Value> iter, Text dataLocation, Text term) {
+ this.iter = iter;
+ this.dataLocation = dataLocation;
+ this.term = term;
+ this.notFlag = false;
+ }
+
+ public TermSource(SortedKeyValueIterator<Key,Value> iter, Text dataLocation, Text term, boolean notFlag) {
+ this.iter = iter;
+ this.dataLocation = dataLocation;
+ this.term = term;
+ this.notFlag = notFlag;
+ }
+
+ public String getTermString() {
+ return (this.term == null) ? new String("Iterator") : this.term.toString();
+ }
+ }
+
+ /*
+ * | Row | Column Family | Column Qualifier | Value | {RowID} | {dataLocation} | {term}\0{dataType}\0{UID} | Empty
+ */
+ protected Text getPartition(Key key) {
+ return key.getRow();
+ }
+
+ /**
+ * Returns the given key's dataLocation
+ *
+ * @param key
+ * @return
+ */
+ protected Text getDataLocation(Key key) {
+ return key.getColumnFamily();
+ }
+
+ /**
+ * Returns the given key's term
+ *
+ * @param key
+ * @return
+ */
+ protected Text getTerm(Key key) {
+ int idx = 0;
+ String sKey = key.getColumnQualifier().toString();
+
+ idx = sKey.indexOf("\0");
+ return new Text(sKey.substring(0, idx));
+ }
+
+ /**
+ * Returns the given key's DocID
+ *
+ * @param key
+ * @return
+ */
+ protected Text getDocID(Key key) {
+ int idx = 0;
+ String sKey = key.getColumnQualifier().toString();
+
+ idx = sKey.indexOf("\0");
+ return new Text(sKey.substring(idx + 1));
+ }
+
+ /**
+ * Returns the given key's UID
+ *
+ * @param key
+ * @return
+ */
+ protected String getUID(Key key) {
+ int idx = 0;
+ String sKey = key.getColumnQualifier().toString();
+
+ idx = sKey.lastIndexOf("\0");
+ return sKey.substring(idx + 1);
+ }
+
+ /**
+ * Build a key from the given row and dataLocation
+ *
+ * @param row
+ * The desired row
+ * @param dataLocation
+ * The desired dataLocation
+ * @return
+ */
+ protected Key buildKey(Text row, Text dataLocation) {
+ return new Key(row, (dataLocation == null) ? nullText : dataLocation);
+ }
+
+ /**
+ * Build a key from the given row, dataLocation, and term
+ *
+ * @param row
+ * The desired row
+ * @param dataLocation
+ * The desired dataLocation
+ * @param term
+ * The desired term
+ * @return
+ */
+ protected Key buildKey(Text row, Text dataLocation, Text term) {
+ return new Key(row, (dataLocation == null) ? nullText : dataLocation, (term == null) ? nullText : term);
+ }
+
+ /**
+ * Return the key that directly follows the given key
+ *
+ * @param key
+ * The key who will be directly before the returned key
+ * @return
+ */
+ protected Key buildFollowingPartitionKey(Key key) {
+ return key.followingKey(PartialKey.ROW);
+ }
+
+ /**
+ * Empty default constructor
+ */
+ public AndIterator() {}
+
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new AndIterator(this, env);
+ }
+
+ public AndIterator(AndIterator other, IteratorEnvironment env) {
+ if (other.sources != null) {
+ sourcesCount = other.sourcesCount;
+ sources = new TermSource[sourcesCount];
+ for (int i = 0; i < sourcesCount; i++) {
+ sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].dataLocation, other.sources[i].term);
+ }
+ }
+ }
+
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ public Value getTopValue() {
+ return value;
+ }
+
+ public boolean hasTop() {
+ return currentRow != null;
+ }
+
+ /**
+ * Find the next key in the current TermSource that is at or beyond the cursor (currentRow, currentTerm, currentDocID).
+ *
+ * @param sourceID
+ * The index of the current source in <code>sources</code>
+ * @return True if the source advanced beyond the cursor
+ * @throws IOException
+ */
+ private boolean seekOneSource(TermSource ts) throws IOException {
+ /*
+ * Within this loop progress must be made in one of the following forms: - currentRow, currentTerm, or curretDocID must be increased - the given source must
+ * advance its iterator This loop will end when any of the following criteria are met - the iterator for the given source is pointing to the key
+ * (currentRow, columnFamilies[sourceID], currentTerm, currentDocID) - the given source is out of data and currentRow is set to null - the given source has
+ * advanced beyond the endRow and currentRow is set to null
+ */
+
+ // precondition: currentRow is not null
+ boolean advancedCursor = false;
+
+ while (true) {
+ if (ts.iter.hasTop() == false) {
if (log.isDebugEnabled()) {
- log.debug("In ModifiedIntersectingIterator.next()");
- }
-
- if (currentRow == null) {
- return;
- }
-
- // precondition: the current row is set up and the sources all have the same column qualifier
- // while we don't have a match, seek in the source with the smallest column qualifier
- sources[0].iter.next();
-
- advanceToIntersection();
-
- if (hasTop()) {
- if (overallRange != null && !overallRange.contains(topKey)) {
- topKey = null;
- }
+ log.debug("The current iterator no longer has a top");
}
- }
-
- protected void advanceToIntersection() throws IOException {
+
+ // If we got to the end of an iterator, found a Match if it's a NOT
+ if (ts.notFlag) {
+ break;
+ }
+
+ currentRow = null;
+ // setting currentRow to null counts as advancing the cursor
+ return true;
+ }
+
+ // check if we're past the end key
+ int endCompare = -1;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Current topKey = " + ts.iter.getTopKey());
+ }
+
+ // we should compare the row to the end of the range
+ if (overallRange.getEndKey() != null) {
if (log.isDebugEnabled()) {
- log.debug("In AndIterator.advanceToIntersection()");
+ log.debug("II.seekOneSource overallRange.getEndKey() != null");
}
-
- boolean cursorChanged = true;
- int numSeeks = 0;
- while (cursorChanged) {
- // seek all of the sources to at least the highest seen column qualifier in the current row
- cursorChanged = false;
- for (TermSource ts : sources) {
- if (currentRow == null) {
- topKey = null;
- return;
- }
- numSeeks++;
- if (seekOneSource(ts)) {
- cursorChanged = true;
- break;
- }
- }
+
+ endCompare = overallRange.getEndKey().getRow().compareTo(ts.iter.getTopKey().getRow());
+
+ if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("II.seekOneSource at the end of the tablet server");
+ }
+
+ currentRow = null;
+
+ // setting currentRow to null counts as advancing the cursor
+ return true;
}
-
- topKey = buildKey(currentRow, currentTerm, currentDocID);
-
+ } else {
if (log.isDebugEnabled()) {
- log.debug("ModifiedIntersectingIterator: Got a match: " + topKey);
- }
- }
-
- public static String stringTopKey(SortedKeyValueIterator<Key, Value> iter) {
- if (iter.hasTop()) {
- return iter.getTopKey().toString();
- }
- return "";
- }
- public static final String columnFamiliesOptionName = "columnFamilies";
- public static final String termValuesOptionName = "termValues";
- public static final String notFlagsOptionName = "notFlags";
-
- /**
- * Encode a <code>Text</code> array of all the columns to intersect on
- * @param columns The columns to be encoded
- * @return
- */
- public static String encodeColumns(Text[] columns) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < columns.length; i++) {
- sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i]))));
- sb.append('\n');
- }
- return sb.toString();
- }
-
- /**
- * Encode a <code>Text</code> array of all of the terms to intersect on. The terms should match the columns
- * in a one-to-one manner
- * @param terms The terms to be encoded
- * @return
- */
- public static String encodeTermValues(Text[] terms) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < terms.length; i++) {
- sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(terms[i]))));
- sb.append('\n');
+ log.debug("II.seekOneSource overallRange.getEndKey() == null");
}
-
- return sb.toString();
- }
-
- /**
- * Encode an array of <code>booleans</code> denoted which columns are NOT'ed
- * @param flags The array of NOTs
- * @return
- */
- public static String encodeBooleans(boolean[] flags) {
- byte[] bytes = new byte[flags.length];
- for (int i = 0; i < flags.length; i++) {
- if (flags[i]) {
- bytes[i] = 1;
- } else {
- bytes[i] = 0;
- }
+ }
+
+ // Compare the Row IDs
+ int partitionCompare = currentRow.compareTo(getPartition(ts.iter.getTopKey()));
+ if (log.isDebugEnabled()) {
+ log.debug("Current partition: " + currentRow);
+ }
+
+ // check if this source is already at or beyond currentRow
+ // if not, then seek to at least the current row
+ if (partitionCompare > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Need to seek to the current row");
+
+ // seek to at least the currentRow
+ log.debug("ts.dataLocation = " + ts.dataLocation.getBytes());
+ log.debug("Term = " + new Text(ts.term + "\0" + currentDocID).getBytes());
+ }
+
+ Key seekKey = buildKey(currentRow, ts.dataLocation, nullText);// new Text(ts.term + "\0" + currentDocID));
+
+ if (log.isDebugEnabled()) {
+ log.debug("Seeking to: " + seekKey);
}
- return new String(Base64.encodeBase64(bytes));
- }
-
- /**
- * Decode the encoded columns into a <code>Text</code> array
- * @param columns The Base64 encoded String of the columns
- * @return
- */
- public static Text[] decodeColumns(String columns) {
- String[] columnStrings = columns.split("\n");
- Text[] columnTexts = new Text[columnStrings.length];
- for (int i = 0; i < columnStrings.length; i++) {
- columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes()));
+
+ ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ continue;
+ }
+
+ // check if this source has gone beyond currentRow
+ // if so, advance currentRow
+ if (partitionCompare < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Went too far beyond the currentRow");
}
-
- return columnTexts;
- }
-
- /**
- * Decode the encoded terms into a <code>Text</code> array
- * @param terms The Base64 encoded String of the terms
- * @return
- */
- public static Text[] decodeTermValues(String terms) {
- String[] termStrings = terms.split("\n");
- Text[] termTexts = new Text[termStrings.length];
- for (int i = 0; i < termStrings.length; i++) {
- termTexts[i] = new Text(Base64.decodeBase64(termStrings[i].getBytes()));
+
+ if (ts.notFlag) {
+ break;
}
-
- return termTexts;
- }
-
- /**
- * Decode the encoded NOT flags into a <code>boolean</code> array
- * @param flags
- * @return
- */
- public static boolean[] decodeBooleans(String flags) {
- // return null of there were no flags
- if (flags == null) {
- return null;
+
+ currentRow.set(getPartition(ts.iter.getTopKey()));
+ currentDocID.set(emptyByteArray);
+
+ advancedCursor = true;
+ continue;
+ }
+
+ // we have verified that the current source is positioned in currentRow
+ // now we must make sure we're in the right columnFamily in the current row
+ if (ts.dataLocation != null) {
+ int dataLocationCompare = ts.dataLocation.compareTo(getDataLocation(ts.iter.getTopKey()));
+
+ if (log.isDebugEnabled()) {
+ log.debug("Comparing dataLocations");
+ log.debug("dataLocation = " + ts.dataLocation);
+ log.debug("newDataLocation = " + getDataLocation(ts.iter.getTopKey()));
+ }
+
+ // check if this source is already on the right columnFamily
+ // if not, then seek forwards to the right columnFamily
+ if (dataLocationCompare > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Need to seek to the right dataLocation");
+ }
+
+ Key seekKey = buildKey(currentRow, ts.dataLocation, nullText);// , new Text(ts.term + "\0" + currentDocID));
+
+ if (log.isDebugEnabled()) {
+ log.debug("Seeking to: " + seekKey);
+ }
+
+ ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ if (!ts.iter.hasTop()) {
+ currentRow = null;
+ return true;
+ }
+
+ continue;
+ }
+ // check if this source is beyond the right columnFamily
+ // if so, then seek to the next row
+ if (dataLocationCompare < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Went too far beyond the dataLocation");
+ }
+
+ if (endCompare == 0) {
+ // we're done
+ currentRow = null;
+
+ // setting currentRow to null counts as advancing the cursor
+ return true;
+ }
+
+ // Seeking beyond the current dataLocation gives a valid negated result
+ if (ts.notFlag) {
+ break;
+ }
+
+ Key seekKey = buildFollowingPartitionKey(ts.iter.getTopKey());
+
+ if (log.isDebugEnabled()) {
+ log.debug("Seeking to: " + seekKey);
+ }
+
+ ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ if (!ts.iter.hasTop()) {
+ currentRow = null;
+ return true;
+ }
+ continue;
+ }
+ }
+
+ // Compare the Terms
+ int termCompare = ts.term.compareTo(getTerm(ts.iter.getTopKey()));
+ if (log.isDebugEnabled()) {
+ log.debug("term = " + ts.term);
+ log.debug("newTerm = " + getTerm(ts.iter.getTopKey()));
+ }
+
+ // We need to seek down farther into the data
+ if (termCompare > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Need to seek to the right term");
}
-
- byte[] bytes = Base64.decodeBase64(flags.getBytes());
- boolean[] bFlags = new boolean[bytes.length];
- for (int i = 0; i < bytes.length; i++) {
- if (bytes[i] == 1) {
- bFlags[i] = true;
- } else {
- bFlags[i] = false;
- }
+
+ Key seekKey = buildKey(currentRow, ts.dataLocation, new Text(ts.term + "\0"));// new Text(ts.term + "\0" + currentDocID));
+
+ if (log.isDebugEnabled()) {
+ log.debug("Seeking to: " + seekKey);
}
-
- return bFlags;
- }
-
- public void init(SortedKeyValueIterator<Key, Value> source,
- Map<String, String> options, IteratorEnvironment env) throws IOException {
+
+ ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+ if (!ts.iter.hasTop()) {
+ currentRow = null;
+ return true;
+ }
+
+ // currentTerm = getTerm(ts.iter.getTopKey());
+
if (log.isDebugEnabled()) {
- log.debug("In AndIterator.init()");
+ log.debug("topKey after seeking to correct term: " + ts.iter.getTopKey());
}
-
- Text[] dataLocations = decodeColumns(options.get(columnFamiliesOptionName));
- Text[] terms = decodeTermValues(options.get(termValuesOptionName));
- boolean[] notFlags = decodeBooleans(options.get(notFlagsOptionName));
-
- if (terms.length < 2) {
- throw new IllegalArgumentException("AndIterator requires two or more columns families");
+
+ continue;
+ }
+
+ // We've jumped out of the current term, set the new term as currentTerm and start looking again
+ if (termCompare < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("TERM: Need to jump to the next row");
}
-
- // Scan the not flags.
- // There must be at least one term that isn't negated
- // And we are going to re-order such that the first term is not a ! term
- if (notFlags == null) {
- notFlags = new boolean[terms.length];
- for (int i = 0; i < terms.length; i++) {
- notFlags[i] = false;
- }
+
+ if (endCompare == 0) {
+ currentRow = null;
+
+ return true;
+ }
+
+ if (ts.notFlag) {
+ break;
}
-
- // Make sure that the first dataLocation/Term is not a NOT by swapping it with a later dataLocation/Term
- if (notFlags[0]) {
- for (int i = 1; i < notFlags.length; i++) {
- if (notFlags[i] == false) {
- // Swap the terms
- Text swap = new Text(terms[0]);
- terms[0].set(terms[i]);
- terms[i].set(swap);
-
- // Swap the dataLocations
- swap.set(dataLocations[0]);
- dataLocations[0].set(dataLocations[i]);
- dataLocations[i].set(swap);
-
- // Flip the notFlags
- notFlags[0] = false;
- notFlags[i] = true;
- break;
- }
- }
-
- if (notFlags[0]) {
- throw new IllegalArgumentException("AndIterator requires at least one column family without not");
- }
+
+ Key seekKey = buildFollowingPartitionKey(ts.iter.getTopKey());
+ if (log.isDebugEnabled()) {
+ log.debug("Using this key to find the next key: " + ts.iter.getTopKey());
+ log.debug("Seeking to: " + seekKey);
}
-
- // Build up the array of sources that are to be intersected
- sources = new TermSource[dataLocations.length];
- sources[0] = new TermSource(source, dataLocations[0], terms[0]);
- for (int i = 1; i < dataLocations.length; i++) {
- sources[i] = new TermSource(source.deepCopy(env), dataLocations[i], terms[i], notFlags[i]);
+
+ ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+
+ if (!ts.iter.hasTop()) {
+ currentRow = null;
+ return true;
+ }
+
+ currentTerm = getTerm(ts.iter.getTopKey());
+
+ continue;
+ }
+
+ // Compare the DocIDs
+ Text docid = getDocID(ts.iter.getTopKey());
+ int docidCompare = currentDocID.compareTo(docid);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Comparing DocIDs");
+ log.debug("currentDocID = " + currentDocID);
+ log.debug("docid = " + docid);
+ }
+
+ // The source isn't at the right DOC
+ if (docidCompare > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Need to seek to the correct docid");
}
-
- sourcesCount = dataLocations.length;
- }
-
- public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
+
+ // seek forwards
+ Key seekKey = buildKey(currentRow, ts.dataLocation, new Text(ts.term + "\0" + currentDocID));
+
if (log.isDebugEnabled()) {
- log.debug("In AndIterator.seek()");
- log.debug("AndIterator.seek Given range => " + range);
+ log.debug("Seeking to: " + seekKey);
}
- //if (firstSeek) {
- overallRange = new Range(range);
- //firstSeek = false;
- //}
- if (range.getEndKey() != null && range.getEndKey().getRow() != null) {
- this.parentEndRow = range.getEndKey().getRow();
+
+ ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+
+ continue;
+ }
+
+ // if this source has advanced beyond the current column qualifier then advance currentCQ and return true
+ if (docidCompare < 0) {
+ if (ts.notFlag) {
+ break;
}
-
- //overallRange = new Range(range);
- currentRow = new Text();
- currentDocID.set(emptyByteArray);
-
- this.seekColumnFamilies = seekColumnFamilies;
- this.inclusive = inclusive;
-
- // seek each of the sources to the right column family within the row given by key
- for (int i = 0; i < sourcesCount; i++) {
- Key sourceKey;
- if (range.getStartKey() != null) {
- // Build a key with the DocID if one is given
- if (range.getStartKey().getColumnQualifier() != null) {
- sourceKey = buildKey(getPartition(range.getStartKey()),
- (sources[i].dataLocation == null) ? nullText : sources[i].dataLocation,
- (sources[i].term == null) ? nullText : new Text(sources[i].term + "\0" + range.getStartKey().getColumnQualifier()));
- } // Build a key with just the term.
- else {
- sourceKey = buildKey(getPartition(range.getStartKey()),
- (sources[i].dataLocation == null) ? nullText : sources[i].dataLocation,
- (sources[i].term == null) ? nullText : sources[i].term);
- }
-
- sources[i].iter.seek(new Range(sourceKey, true, null, false), seekColumnFamilies, inclusive);
- } else {
- sources[i].iter.seek(range, seekColumnFamilies, inclusive);
- }
+
+ if (log.isDebugEnabled()) {
+ log.debug("We went too far, update the currentDocID to be the location of where were seek'ed to");
}
-
- advanceToIntersection();
-
- if (hasTop()) {
- if (overallRange != null && !overallRange.contains(topKey)) {
- topKey = null;
- if (log.isDebugEnabled()) {
- log.debug("seek, topKey is outside of overall range: " + overallRange);
- }
- }
+
+ currentDocID.set(docid);
+ advancedCursor = true;
+ break;
+ }
+
+ // Set the term as currentTerm (in case we found this record on the first try)
+ currentTerm = getTerm(ts.iter.getTopKey());
+
+ if (log.isDebugEnabled()) {
+ log.debug("currentTerm = " + currentTerm);
+ }
+
+ // If we're negated, next() the first TermSource since we guaranteed it was not a NOT term
+ if (ts.notFlag) {
+ sources[0].iter.next();
+ advancedCursor = true;
+ }
+
+ // If we got here, we have a match
+ break;
+ }
+
+ return advancedCursor;
+ }
+
+ public void next() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("In ModifiedIntersectingIterator.next()");
+ }
+
+ if (currentRow == null) {
+ return;
+ }
+
+ // precondition: the current row is set up and the sources all have the same column qualifier
+ // while we don't have a match, seek in the source with the smallest column qualifier
+ sources[0].iter.next();
+
+ advanceToIntersection();
+
+ if (hasTop()) {
+ if (overallRange != null && !overallRange.contains(topKey)) {
+ topKey = null;
+ }
+ }
+ }
+
+ protected void advanceToIntersection() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("In AndIterator.advanceToIntersection()");
+ }
+
+ boolean cursorChanged = true;
+ int numSeeks = 0;
+ while (cursorChanged) {
+ // seek all of the sources to at least the highest seen column qualifier in the current row
+ cursorChanged = false;
+ for (TermSource ts : sources) {
+ if (currentRow == null) {
+ topKey = null;
+ return;
}
- }
-
- public void addSource(SortedKeyValueIterator<Key, Value> source, IteratorEnvironment env,
- Text term, boolean notFlag) {
- addSource(source, env, null, term, notFlag);
- }
-
- public void addSource(SortedKeyValueIterator<Key, Value> source, IteratorEnvironment env,
- Text dataLocation, Text term, boolean notFlag) {
- // Check if we have space for the added Source
- if (sources == null) {
- sources = new TermSource[1];
- } else {
- // allocate space for node, and copy current tree.
- // TODO: Should we change this to an ArrayList so that we can just add() ?
- TermSource[] localSources = new TermSource[sources.length + 1];
- int currSource = 0;
- for (TermSource myTerm : sources) {
- // TODO: Do I need to call new here? or can I just re-use the term?
- localSources[currSource] = new TermSource(myTerm);
- currSource++;
- }
- sources = localSources;
+ numSeeks++;
+ if (seekOneSource(ts)) {
+ cursorChanged = true;
+ break;
+ }
+ }
+ }
+
+ topKey = buildKey(currentRow, currentTerm, currentDocID);
+
+ if (log.isDebugEnabled()) {
+ log.debug("ModifiedIntersectingIterator: Got a match: " + topKey);
+ }
+ }
+
+ public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
+ if (iter.hasTop()) {
+ return iter.getTopKey().toString();
+ }
+ return "";
+ }
+
+ public static final String columnFamiliesOptionName = "columnFamilies";
+ public static final String termValuesOptionName = "termValues";
+ public static final String notFlagsOptionName = "notFlags";
+
+ /**
+ * Encode a <code>Text</code> array of all the columns to intersect on
+ *
+ * @param columns
+ * The columns to be encoded
+ * @return
+ */
+ public static String encodeColumns(Text[] columns) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < columns.length; i++) {
+ sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i]))));
+ sb.append('\n');
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Encode a <code>Text</code> array of all of the terms to intersect on. The terms should match the columns in a one-to-one manner
+ *
+ * @param terms
+ * The terms to be encoded
+ * @return
+ */
+ public static String encodeTermValues(Text[] terms) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < terms.length; i++) {
+ sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(terms[i]))));
+ sb.append('\n');
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Encode an array of <code>booleans</code> denoted which columns are NOT'ed
+ *
+ * @param flags
+ * The array of NOTs
+ * @return
+ */
+ public static String encodeBooleans(boolean[] flags) {
+ byte[] bytes = new byte[flags.length];
+ for (int i = 0; i < flags.length; i++) {
+ if (flags[i]) {
+ bytes[i] = 1;
+ } else {
+ bytes[i] = 0;
+ }
+ }
+ return new String(Base64.encodeBase64(bytes));
+ }
+
+ /**
+ * Decode the encoded columns into a <code>Text</code> array
+ *
+ * @param columns
+ * The Base64 encoded String of the columns
+ * @return
+ */
+ public static Text[] decodeColumns(String columns) {
+ String[] columnStrings = columns.split("\n");
+ Text[] columnTexts = new Text[columnStrings.length];
+ for (int i = 0; i < columnStrings.length; i++) {
+ columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes()));
+ }
+
+ return columnTexts;
+ }
+
+ /**
+ * Decode the encoded terms into a <code>Text</code> array
+ *
+ * @param terms
+ * The Base64 encoded String of the terms
+ * @return
+ */
+ public static Text[] decodeTermValues(String terms) {
+ String[] termStrings = terms.split("\n");
+ Text[] termTexts = new Text[termStrings.length];
+ for (int i = 0; i < termStrings.length; i++) {
+ termTexts[i] = new Text(Base64.decodeBase64(termStrings[i].getBytes()));
+ }
+
+ return termTexts;
+ }
+
+ /**
+ * Decode the encoded NOT flags into a <code>boolean</code> array
+ *
+ * @param flags
+ * @return
+ */
+ public static boolean[] decodeBooleans(String flags) {
+ // return null of there were no flags
+ if (flags == null) {
+ return null;
+ }
+
+ byte[] bytes = Base64.decodeBase64(flags.getBytes());
+ boolean[] bFlags = new boolean[bytes.length];
+ for (int i = 0; i < bytes.length; i++) {
+ if (bytes[i] == 1) {
+ bFlags[i] = true;
+ } else {
+ bFlags[i] = false;
+ }
+ }
+
+ return bFlags;
+ }
+
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("In AndIterator.init()");
+ }
+
+ Text[] dataLocations = decodeColumns(options.get(columnFamiliesOptionName));
+ Text[] terms = decodeTermValues(options.get(termValuesOptionName));
+ boolean[] notFlags = decodeBooleans(options.get(notFlagsOptionName));
+
+ if (terms.length < 2) {
+ throw new IllegalArgumentException("AndIterator requires two or more columns families");
+ }
+
+ // Scan the not flags.
+ // There must be at least one term that isn't negated
+ // And we are going to re-order such that the first term is not a ! term
+ if (notFlags == null) {
+ notFlags = new boolean[terms.length];
+ for (int i = 0; i < terms.length; i++) {
+ notFlags[i] = false;
+ }
+ }
+
+ // Make sure that the first dataLocation/Term is not a NOT by swapping it with a later dataLocation/Term
+ if (notFlags[0]) {
+ for (int i = 1; i < notFlags.length; i++) {
+ if (notFlags[i] == false) {
+ // Swap the terms
+ Text swap = new Text(terms[0]);
+ terms[0].set(terms[i]);
+ terms[i].set(swap);
+
+ // Swap the dataLocations
+ swap.set(dataLocations[0]);
+ dataLocations[0].set(dataLocations[i]);
+ dataLocations[i].set(swap);
+
+ // Flip the notFlags
+ notFlags[0] = false;
+ notFlags[i] = true;
+ break;
+ }
+ }
+
+ if (notFlags[0]) {
+ throw new IllegalArgumentException("AndIterator requires at least one column family without not");
+ }
+ }
+
+ // Build up the array of sources that are to be intersected
+ sources = new TermSource[dataLocations.length];
+ sources[0] = new TermSource(source, dataLocations[0], terms[0]);
+ for (int i = 1; i < dataLocations.length; i++) {
+ sources[i] = new TermSource(source.deepCopy(env), dataLocations[i], terms[i], notFlags[i]);
+ }
+
+ sourcesCount = dataLocations.length;
+ }
+
+ public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("In AndIterator.seek()");
+ log.debug("AndIterator.seek Given range => " + range);
+ }
+ // if (firstSeek) {
+ overallRange = new Range(range);
+ // firstSeek = false;
+ // }
+ if (range.getEndKey() != null && range.getEndKey().getRow() != null) {
+ this.parentEndRow = range.getEndKey().getRow();
+ }
+
+ // overallRange = new Range(range);
+ currentRow = new Text();
+ currentDocID.set(emptyByteArray);
+
+ this.seekColumnFamilies = seekColumnFamilies;
+ this.inclusive = inclusive;
+
+ // seek each of the sources to the right column family within the row given by key
+ for (int i = 0; i < sourcesCount; i++) {
+ Key sourceKey;
+ if (range.getStartKey() != null) {
+ // Build a key with the DocID if one is given
+ if (range.getStartKey().getColumnQualifier() != null) {
+ sourceKey = buildKey(getPartition(range.getStartKey()), (sources[i].dataLocation == null) ? nullText : sources[i].dataLocation,
+ (sources[i].term == null) ? nullText : new Text(sources[i].term + "\0" + range.getStartKey().getColumnQualifier()));
+ } // Build a key with just the term.
+ else {
+ sourceKey = buildKey(getPartition(range.getStartKey()), (sources[i].dataLocation == null) ? nullText : sources[i].dataLocation,
+ (sources[i].term == null) ? nullText : sources[i].term);
+ }
+
+ sources[i].iter.seek(new Range(sourceKey, true, null, false), seekColumnFamilies, inclusive);
+ } else {
+ sources[i].iter.seek(range, seekColumnFamilies, inclusive);
+ }
+ }
+
+ advanceToIntersection();
+
+ if (hasTop()) {
+ if (overallRange != null && !overallRange.contains(topKey)) {
+ topKey = null;
+ if (log.isDebugEnabled()) {
+ log.debug("seek, topKey is outside of overall range: " + overallRange);
}
-
- sources[sourcesCount] = new TermSource(source.deepCopy(env), dataLocation, term, notFlag);
- sourcesCount++;
+ }
}
-
- public boolean jump(Key jumpKey) throws IOException {
+ }
+
+ public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text term, boolean notFlag) {
+ addSource(source, env, null, term, notFlag);
+ }
+
+ public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text dataLocation, Text term, boolean notFlag) {
+ // Check if we have space for the added Source
+ if (sources == null) {
+ sources = new TermSource[1];
+ } else {
+ // allocate space for node, and copy current tree.
+ // TODO: Should we change this to an ArrayList so that we can just add() ?
+ TermSource[] localSources = new TermSource[sources.length + 1];
+ int currSource = 0;
+ for (TermSource myTerm : sources) {
+ // TODO: Do I need to call new here? or can I just re-use the term?
+ localSources[currSource] = new TermSource(myTerm);
+ currSource++;
+ }
+ sources = localSources;
+ }
+
+ sources[sourcesCount] = new TermSource(source.deepCopy(env), dataLocation, term, notFlag);
+ sourcesCount++;
+ }
+
+ public boolean jump(Key jumpKey) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("jump: " + jumpKey);
+ }
+
+ // is the jumpKey outside my overall range?
+ if (parentEndRow != null && parentEndRow.compareTo(jumpKey.getRow()) < 0) {
+ // can't go there.
+ if (log.isDebugEnabled()) {
+ log.debug("jumpRow: " + jumpKey.getRow() + " is greater than my parentEndRow: " + parentEndRow);
+ }
+ return false;
+ }
+
+ if (!hasTop()) {
+ // TODO: will need to add current/last row if you want to measure if
+ // we don't have topkey because we hit end of tablet.
+
+ if (log.isDebugEnabled()) {
+ log.debug("jump called, but topKey is null, must need to move to next row");
+ }
+ // call seek with the jumpKey
+
+ Key endKey = null;
+ if (parentEndRow != null) {
+ endKey = new Key(parentEndRow);
+ }
+ Range newRange = new Range(jumpKey, true, endKey, false);
+ this.seek(newRange, seekColumnFamilies, false);
+ // the parent seek should account for the endKey range check.
+ return hasTop();
+ } else {
+
+ int comp = this.topKey.getRow().compareTo(jumpKey.getRow());
+ // compare rows
+ if (comp > 0) {
if (log.isDebugEnabled()) {
- log.debug("jump: " + jumpKey);
+ log.debug("jump, our row is ahead of jumpKey.");
+ log.debug("jumpRow: " + jumpKey.getRow() + " myRow: " + topKey.getRow() + " parentEndRow" + parentEndRow);
}
-
- // is the jumpKey outside my overall range?
- if (parentEndRow != null && parentEndRow.compareTo(jumpKey.getRow()) < 0) {
- //can't go there.
- if (log.isDebugEnabled()) {
- log.debug("jumpRow: " + jumpKey.getRow() + " is greater than my parentEndRow: " + parentEndRow);
- }
- return false;
+ return hasTop(); // do nothing, we're ahead of jumpKey row
+ } else if (comp < 0) { // a row behind jump key, need to move forward
+
+ if (log.isDebugEnabled()) {
+ log.debug("II jump, row jump");
}
-
- if (!hasTop()) {
- //TODO: will need to add current/last row if you want to measure if
- //we don't have topkey because we hit end of tablet.
-
- if (log.isDebugEnabled()) {
- log.debug("jump called, but topKey is null, must need to move to next row");
- }
- // call seek with the jumpKey
-
- Key endKey = null;
- if (parentEndRow != null) {
- endKey = new Key(parentEndRow);
- }
- Range newRange = new Range(jumpKey, true, endKey, false);
- this.seek(newRange, seekColumnFamilies, false);
- // the parent seek should account for the endKey range check.
- return hasTop();
- } else {
-
- int comp = this.topKey.getRow().compareTo(jumpKey.getRow());
- //compare rows
- if (comp > 0) {
- if (log.isDebugEnabled()) {
- log.debug("jump, our row is ahead of jumpKey.");
- log.debug("jumpRow: " + jumpKey.getRow() + " myRow: " + topKey.getRow() + " parentEndRow" + parentEndRow);
- }
- return hasTop(); // do nothing, we're ahead of jumpKey row
- } else if (comp < 0) { //a row behind jump key, need to move forward
-
- if (log.isDebugEnabled()) {
- log.debug("II jump, row jump");
- }
- Key endKey = null;
- if (parentEndRow != null) {
- endKey = new Key(parentEndRow);
- }
- Key sKey = new Key(jumpKey.getRow());
- Range fake = new Range(sKey, true, endKey, false);
- this.seek(fake, this.seekColumnFamilies, false);
- return hasTop();
- } else {
- //need to check uid
- String myUid = getUID(this.topKey);
- String jumpUid = getUID(jumpKey);
- if (log.isDebugEnabled()) {
- if (myUid == null) {
- log.debug("myUid is null");
- } else {
- log.debug("myUid: " + myUid);
- }
-
- if (jumpUid == null) {
- log.debug("jumpUid is null");
- } else {
- log.debug("jumpUid: " + jumpUid);
- }
- }
-
- int ucomp = myUid.compareTo(jumpUid);
- if (ucomp < 0) { //need to move all sources forward
- if (log.isDebugEnabled()) {
- log.debug("jump, uid jump");
- }
- // move one, and then advanceToIntersection will move the rest.
- Text row = jumpKey.getRow();
- String cq = topKey.getColumnQualifier().toString();
- cq = cq.replaceAll(myUid, jumpUid);
-
- Key startKey = buildKey(row, topKey.getColumnFamily(), new Text(cq));
- Range range = new Range(startKey, true, null, false);
- sources[0].iter.seek(range, seekColumnFamilies, true);
- advanceToIntersection();
-
- // make sure it is in the range if we have one.
- if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) {
- topKey = null;
- }
- if (log.isDebugEnabled() && hasTop()) {
- log.debug("jump, topKey is now: " + topKey);
- }
- return hasTop();
-
-
- }//else do nothing
- if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) {
- topKey = null;
- }
- return hasTop();
- }
+ Key endKey = null;
+ if (parentEndRow != null) {
+ endKey = new Key(parentEndRow);
+ }
+ Key sKey = new Key(jumpKey.getRow());
+ Range fake = new Range(sKey, true, endKey, false);
+ this.seek(fake, this.seekColumnFamilies, false);
+ return hasTop();
+ } else {
+ // need to check uid
+ String myUid = getUID(this.topKey);
+ String jumpUid = getUID(jumpKey);
+ if (log.isDebugEnabled()) {
+ if (myUid == null) {
+ log.debug("myUid is null");
+ } else {
+ log.debug("myUid: " + myUid);
+ }
+
+ if (jumpUid == null) {
+ log.debug("jumpUid is null");
+ } else {
+ log.debug("jumpUid: " + jumpUid);
+ }
+ }
+
+ int ucomp = myUid.compareTo(jumpUid);
+ if (ucomp < 0) { // need to move all sources forward
+ if (log.isDebugEnabled()) {
+ log.debug("jump, uid jump");
+ }
+ // move one, and then advanceToIntersection will move the rest.
+ Text row = jumpKey.getRow();
+ String cq = topKey.getColumnQualifier().toString();
+ cq = cq.replaceAll(myUid, jumpUid);
+
+ Key startKey = buildKey(row, topKey.getColumnFamily(), new Text(cq));
+ Range range = new Range(startKey, true, null, false);
+ sources[0].iter.seek(range, seekColumnFamilies, true);
+ advanceToIntersection();
+
+ // make sure it is in the range if we have one.
+ if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) {
+ topKey = null;
+ }
+ if (log.isDebugEnabled() && hasTop()) {
+ log.debug("jump, topKey is now: " + topKey);
+ }
+ return hasTop();
+
+ }// else do nothing
+ if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) {
+ topKey = null;
}
+ return hasTop();
+ }
}
+ }
}