You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2011/12/27 19:19:44 UTC
svn commit: r1224966 [7/10] - in /incubator/accumulo/branches/1.4: ./
contrib/accumulo_sample/
contrib/accumulo_sample/ingest/src/main/java/aggregator/
contrib/accumulo_sample/ingest/src/main/java/ingest/
contrib/accumulo_sample/ingest/src/test/java/ag...
Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/UniqFieldNameValueIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/UniqFieldNameValueIterator.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/UniqFieldNameValueIterator.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/UniqFieldNameValueIterator.java Tue Dec 27 18:19:43 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package iterator;
import java.io.IOException;
@@ -29,7 +29,6 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
-import org.apache.commons.jexl2.Expression;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -37,317 +36,307 @@ import org.apache.log4j.Logger;
import util.FieldIndexKeyParser;
public class UniqFieldNameValueIterator extends WrappingIterator {
-
- protected static final Logger log = Logger.getLogger(UniqFieldNameValueIterator.class);
- // Wrapping iterator only accesses its private source in setSource and getSource
- // Since this class overrides these methods, it's safest to keep the source declaration here
- private SortedKeyValueIterator<Key, Value> source;
- private FieldIndexKeyParser keyParser;
- private Key topKey = null;
- private Value topValue = null;
- private Range overallRange = null;
- private Range currentSubRange;
- private Text fieldName = null;
- private String fieldNameString = null;
- private Text fieldValueLowerBound = null;
- private Text fieldValueUpperBound = null;
- private String operator = null;
- private Expression expr = null;
- private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
- private static final String NULL_BYTE = "\0";
- private static final String ONE_BYTE = "\1";
- private boolean multiRow = false;
- private boolean seekInclusive = false;
-
- //-------------------------------------------------------------------------
- //------------- Static Methods
- public static void setLogLevel(Level l) {
- log.setLevel(l);
- }
-
- //-------------------------------------------------------------------------
- //------------- Constructors
- public UniqFieldNameValueIterator(Text fName, Text fValLower, Text fValUpper) {
- this.fieldName = fName;
- this.fieldNameString = fieldName.toString();
- this.fieldValueLowerBound = fValLower;
- this.fieldValueUpperBound = fValUpper;
- keyParser = createDefaultKeyParser();
-
- }
-
- public UniqFieldNameValueIterator(UniqFieldNameValueIterator other, IteratorEnvironment env) {
- source = other.getSource().deepCopy(env);
- //Set a default KeyParser
- keyParser = createDefaultKeyParser();
- }
-
- //-------------------------------------------------------------------------
- //------------- Overrides
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
- source = super.getSource();
- }
-
- @Override
- protected void setSource(SortedKeyValueIterator<Key, Value> source) {
- this.source = source;
- }
-
- @Override
- protected SortedKeyValueIterator<Key, Value> getSource() {
- return source;
- }
-
- @Override
- public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
- return new UniqFieldNameValueIterator(this, env);
- }
-
- @Override
- public Key getTopKey() {
- return this.topKey;
- }
-
- @Override
- public Value getTopValue() {
- return this.topValue;
- }
-
- @Override
- public boolean hasTop() {
- return (topKey != null);
- }
-
- @Override
- public void next() throws IOException {
+
+ protected static final Logger log = Logger.getLogger(UniqFieldNameValueIterator.class);
+ // Wrapping iterator only accesses its private source in setSource and getSource
+ // Since this class overrides these methods, it's safest to keep the source declaration here
+ private SortedKeyValueIterator<Key,Value> source;
+ private FieldIndexKeyParser keyParser;
+ private Key topKey = null;
+ private Value topValue = null;
+ private Range overallRange = null;
+ private Range currentSubRange;
+ private Text fieldName = null;
+ private Text fieldValueLowerBound = null;
+ private Text fieldValueUpperBound = null;
+ private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
+ private static final String ONE_BYTE = "\1";
+ private boolean multiRow = false;
+ private boolean seekInclusive = false;
+
+ // -------------------------------------------------------------------------
+ // ------------- Static Methods
+ public static void setLogLevel(Level l) {
+ log.setLevel(l);
+ }
+
+ // -------------------------------------------------------------------------
+ // ------------- Constructors
+ public UniqFieldNameValueIterator(Text fName, Text fValLower, Text fValUpper) {
+ this.fieldName = fName;
+ this.fieldValueLowerBound = fValLower;
+ this.fieldValueUpperBound = fValUpper;
+ keyParser = createDefaultKeyParser();
+
+ }
+
+ public UniqFieldNameValueIterator(UniqFieldNameValueIterator other, IteratorEnvironment env) {
+ source = other.getSource().deepCopy(env);
+ // Set a default KeyParser
+ keyParser = createDefaultKeyParser();
+ }
+
+ // -------------------------------------------------------------------------
+ // ------------- Overrides
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ source = super.getSource();
+ }
+
+ @Override
+ protected void setSource(SortedKeyValueIterator<Key,Value> source) {
+ this.source = source;
+ }
+
+ @Override
+ protected SortedKeyValueIterator<Key,Value> getSource() {
+ return source;
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new UniqFieldNameValueIterator(this, env);
+ }
+
+ @Override
+ public Key getTopKey() {
+ return this.topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return this.topValue;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return (topKey != null);
+ }
+
+ @Override
+ public void next() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("next()");
+ }
+ if (!source.hasTop()) {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+
+ Key currentKey = topKey;
+ keyParser.parse(topKey);
+ String fValue = keyParser.getFieldValue();
+
+ Text currentRow = currentKey.getRow();
+ Text currentFam = currentKey.getColumnFamily();
+
+ if (overallRange.getEndKey() != null && overallRange.getEndKey().getRow().compareTo(currentRow) < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("next, overall endRow: " + overallRange.getEndKey().getRow() + " currentRow: " + currentRow);
+ }
+ topKey = null;
+ topValue = null;
+ return;
+ }
+
+ if (fValue.compareTo(this.fieldValueUpperBound.toString()) > 0) {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+ Key followingKey = new Key(currentKey.getRow(), this.fieldName, new Text(fValue + ONE_BYTE));
+ if (log.isDebugEnabled()) {
+ log.debug("next, followingKey to seek on: " + followingKey);
+ }
+ Range r = new Range(followingKey, followingKey);
+ source.seek(r, EMPTY_COL_FAMS, false);
+ while (true) {
+ if (!source.hasTop()) {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+
+ Key k = source.getTopKey();
+ if (!overallRange.contains(k)) {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("next(), key: " + k + " subrange: " + this.currentSubRange);
+ }
+ // if (this.currentSubRange.contains(k)) {
+ keyParser.parse(k);
+ Text currentVal = new Text(keyParser.getFieldValue());
+ if (k.getRow().equals(currentRow) && k.getColumnFamily().equals(currentFam) && currentVal.compareTo(fieldValueUpperBound) <= 0) {
+ topKey = k;
+ topValue = source.getTopValue();
+ return;
+
+ } else { // need to move to next row.
+ if (this.overallRange.contains(k) && this.multiRow) {
+ // need to find the next sub range
+ // STEPS
+ // 1. check if you moved past your current row on last call to next
+ // 2. figure out next row
+ // 3. build new start key with lowerbound fvalue
+ // 4. seek the source
+ // 5. test the subrange.
+ if (k.getRow().equals(currentRow)) {
+ // get next row
+ currentRow = getNextRow();
+ if (currentRow == null) {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+ } else {
+ // i'm already in the next row
+ currentRow = source.getTopKey().getRow();
+ }
+
+ // build new startKey
+ Key sKey = new Key(currentRow, fieldName, fieldValueLowerBound);
+ Key eKey = new Key(currentRow, fieldName, fieldValueUpperBound);
+ currentSubRange = new Range(sKey, eKey);
+ source.seek(currentSubRange, EMPTY_COL_FAMS, seekInclusive);
+
+ } else { // not multi-row or outside overall range, we're done
+ topKey = null;
+ topValue = null;
+ return;
+ }
+ }
+
+ }
+
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("seek, range: " + range);
+ }
+ this.overallRange = range;
+ this.seekInclusive = inclusive;
+ source.seek(range, EMPTY_COL_FAMS, inclusive);
+ topKey = null;
+ topValue = null;
+ Key sKey;
+ Key eKey;
+
+ if (range.isInfiniteStartKey()) {
+ sKey = source.getTopKey();
+ if (sKey == null) {
+ return;
+ }
+ } else {
+ sKey = range.getStartKey();
+ }
+
+ if (range.isInfiniteStopKey()) {
+ eKey = null;
+ this.multiRow = true; // assume we will go to the end of the tablet.
+ } else {
+ eKey = range.getEndKey();
+ if (sKey.getRow().equals(eKey.getRow())) {
+ this.multiRow = false;
+ } else {
+ this.multiRow = true;
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("seek, multiRow:" + multiRow + " range:" + range);
+ }
+
+ /*
+ * NOTE: If the seek range spans multiple rows, we are only interested in the fieldName:fieldValue subranges in each row. Keys will exist in the
+ * overallRange that we will want to skip over so we need to create subranges per row so we don't have to examine every key in between.
+ */
+
+ Text sRow = sKey.getRow();
+ Key ssKey = new Key(sRow, this.fieldName, this.fieldValueLowerBound);
+ Key eeKey = new Key(sRow, this.fieldName, this.fieldValueUpperBound);
+ this.currentSubRange = new Range(ssKey, eeKey);
+
+ if (log.isDebugEnabled()) {
+ log.debug("seek, currentSubRange: " + currentSubRange);
+ }
+ source.seek(this.currentSubRange, columnFamilies, inclusive);
+ // cycle until we find a valid topKey, or we get ejected b/c we hit the
+ // end of the tablet or exceeded the overallRange.
+ while (topKey == null) {
+ if (source.hasTop()) {
+ Key k = source.getTopKey();
if (log.isDebugEnabled()) {
- log.debug("next()");
+ log.debug("seek, source.topKey: " + k);
}
- if (!source.hasTop()) {
- topKey = null;
- topValue = null;
- return;
- }
-
- Key currentKey = topKey;
- keyParser.parse(topKey);
- String fValue = keyParser.getFieldValue();
-
- Text currentRow = currentKey.getRow();
- Text currentFam = currentKey.getColumnFamily();
-
- if(overallRange.getEndKey() != null && overallRange.getEndKey().getRow().compareTo(currentRow) < 0){
- if(log.isDebugEnabled()){
- log.debug("next, overall endRow: "+overallRange.getEndKey().getRow()+" currentRow: "+currentRow);
+ if (currentSubRange.contains(k)) {
+ topKey = k;
+ topValue = source.getTopValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug("seek, source has top in valid range");
+ }
+
+ } else { // outside of subRange.
+ // if multiRow mode, get the next row and seek to it
+ if (multiRow && overallRange.contains(k)) {
+
+ Key fKey = sKey.followingKey(PartialKey.ROW);
+ Range fRange = new Range(fKey, eKey);
+ source.seek(fRange, columnFamilies, inclusive);
+
+ if (source.hasTop()) {
+ Text row = source.getTopKey().getRow();
+ Key nKey = new Key(row, this.fieldName, this.fieldValueLowerBound);
+ this.currentSubRange = new Range(nKey, eKey);
+ sKey = this.currentSubRange.getStartKey();
+ Range nextRange = new Range(sKey, eKey);
+ source.seek(nextRange, columnFamilies, inclusive);
+ } else {
+ topKey = null;
+ topValue = null;
+ return;
}
+
+ } else { // not multi row & outside range, we're done.
topKey = null;
topValue = null;
return;
+ }
}
-
- if (fValue.compareTo(this.fieldValueUpperBound.toString()) > 0) {
- topKey = null;
- topValue = null;
- return;
- }
- Key followingKey = new Key(currentKey.getRow(), this.fieldName, new Text(fValue + ONE_BYTE));
- if (log.isDebugEnabled()) {
- log.debug("next, followingKey to seek on: " + followingKey);
- }
- Range r = new Range(followingKey, followingKey);
- source.seek(r, EMPTY_COL_FAMS, false);
- while (true) {
- if (!source.hasTop()) {
- topKey = null;
- topValue = null;
- return;
- }
-
- Key k = source.getTopKey();
- if(!overallRange.contains(k)){
- topKey = null;
- topValue = null;
- return;
- }
- if (log.isDebugEnabled()) {
- log.debug("next(), key: " + k + " subrange: " + this.currentSubRange);
- }
- //if (this.currentSubRange.contains(k)) {
- keyParser.parse(k);
- Text currentVal = new Text(keyParser.getFieldValue());
- if(k.getRow().equals(currentRow)
- && k.getColumnFamily().equals(currentFam)
- && currentVal.compareTo(fieldValueUpperBound) <= 0 ){
- topKey = k;
- topValue = source.getTopValue();
- return;
-
- } else { // need to move to next row.
- if (this.overallRange.contains(k) && this.multiRow) {
- // need to find the next sub range
- //STEPS
- //1. check if you moved past your current row on last call to next
- //2. figure out next row
- //3. build new start key with lowerbound fvalue
- //4. seek the source
- //5. test the subrange.
- if (k.getRow().equals(currentRow)) {
- //get next row
- currentRow = getNextRow();
- if (currentRow == null) {
- topKey = null;
- topValue = null;
- return;
- }
- } else {
- //i'm already in the next row
- currentRow = source.getTopKey().getRow();
- }
-
- //build new startKey
- Key sKey = new Key(currentRow, fieldName, fieldValueLowerBound);
- Key eKey = new Key(currentRow, fieldName, fieldValueUpperBound);
- currentSubRange = new Range(sKey, eKey);
- source.seek(currentSubRange, EMPTY_COL_FAMS, seekInclusive);
-
- } else { // not multi-row or outside overall range, we're done
- topKey = null;
- topValue = null;
- return;
- }
- }
-
- }
-
- }
-
- @Override
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("seek, range: " + range);
- }
- this.overallRange = range;
- this.seekInclusive = inclusive;
- source.seek(range, EMPTY_COL_FAMS, inclusive);
+ } else { // source does not have top, we're done
topKey = null;
topValue = null;
- Key sKey;
- Key eKey;
-
- if (range.isInfiniteStartKey()) {
- sKey = source.getTopKey();
- if (sKey == null) {
- return;
- }
- } else {
- sKey = range.getStartKey();
- }
-
- if (range.isInfiniteStopKey()) {
- eKey = null;
- this.multiRow = true; // assume we will go to the end of the tablet.
- } else {
- eKey = range.getEndKey();
- if (sKey.getRow().equals(eKey.getRow())) {
- this.multiRow = false;
- } else {
- this.multiRow = true;
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("seek, multiRow:" + multiRow + " range:" + range);
- }
-
- /*
- * NOTE: If the seek range spans multiple rows, we are only interested
- * in the fieldName:fieldValue subranges in each row. Keys will
- * exist in the overallRange that we will want to skip over so we need
- * to create subranges per row so we don't have to examine every key in
- * between.
- */
-
- Text sRow = sKey.getRow();
- Key ssKey = new Key(sRow, this.fieldName, this.fieldValueLowerBound);
- Key eeKey = new Key(sRow, this.fieldName, this.fieldValueUpperBound);
- this.currentSubRange = new Range(ssKey, eeKey);
-
- if (log.isDebugEnabled()) {
- log.debug("seek, currentSubRange: " + currentSubRange);
- }
- source.seek(this.currentSubRange, columnFamilies, inclusive);
- // cycle until we find a valid topKey, or we get ejected b/c we hit the
- // end of the tablet or exceeded the overallRange.
- while (topKey == null) {
- if (source.hasTop()) {
- Key k = source.getTopKey();
- if (log.isDebugEnabled()) {
- log.debug("seek, source.topKey: " + k);
- }
- if (currentSubRange.contains(k)) {
- topKey = k;
- topValue = source.getTopValue();
-
- if (log.isDebugEnabled()) {
- log.debug("seek, source has top in valid range");
- }
-
- } else { //outside of subRange.
- // if multiRow mode, get the next row and seek to it
- if (multiRow && overallRange.contains(k)) {
-
- Key fKey = sKey.followingKey(PartialKey.ROW);
- Range fRange = new Range(fKey, eKey);
- source.seek(fRange, columnFamilies, inclusive);
-
- if (source.hasTop()) {
- Text row = source.getTopKey().getRow();
- Key nKey = new Key(row, this.fieldName, this.fieldValueLowerBound);
- this.currentSubRange = new Range(nKey, eKey);
- sKey = this.currentSubRange.getStartKey();
- Range nextRange = new Range(sKey, eKey);
- source.seek(nextRange, columnFamilies, inclusive);
- } else {
- topKey = null;
- topValue = null;
- return;
- }
-
- } else { // not multi row & outside range, we're done.
- topKey = null;
- topValue = null;
- return;
- }
- }
- } else { // source does not have top, we're done
- topKey = null;
- topValue = null;
- return;
- }
- }
-
+ return;
+ }
}
-
- //-------------------------------------------------------------------------
- //------------- Internal Methods
- private FieldIndexKeyParser createDefaultKeyParser() {
- FieldIndexKeyParser parser = new FieldIndexKeyParser();
- return parser;
- }
-
- private Text getNextRow() throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("getNextRow()");
- }
- Key fakeKey = new Key(source.getTopKey().followingKey(PartialKey.ROW));
- Range fakeRange = new Range(fakeKey, fakeKey);
- source.seek(fakeRange, EMPTY_COL_FAMS, false);
- if (source.hasTop()) {
- return source.getTopKey().getRow();
- } else {
- return null;
- }
+
+ }
+
+ // -------------------------------------------------------------------------
+ // ------------- Internal Methods
+ private FieldIndexKeyParser createDefaultKeyParser() {
+ FieldIndexKeyParser parser = new FieldIndexKeyParser();
+ return parser;
+ }
+
+ private Text getNextRow() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("getNextRow()");
+ }
+ Key fakeKey = new Key(source.getTopKey().followingKey(PartialKey.ROW));
+ Range fakeRange = new Range(fakeKey, fakeKey);
+ source.seek(fakeRange, EMPTY_COL_FAMS, false);
+ if (source.hasTop()) {
+ return source.getTopKey().getRow();
+ } else {
+ return null;
}
+ }
}