You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/11/26 16:49:55 UTC
[07/40] ACCUMULO-600 removed wikisearch from trunk
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/ReadAheadIterator.java
----------------------------------------------------------------------
diff --git a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/ReadAheadIterator.java b/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/ReadAheadIterator.java
deleted file mode 100644
index 880ae40..0000000
--- a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/ReadAheadIterator.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.wikisearch.iterator;
-
-import java.io.IOException;
-import java.lang.Thread.State;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.OptionDescriber;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-
-/**
- * This iterator takes the source iterator (the one below it in the iterator stack) and puts it in a background thread. The background thread continues
- * processing and fills a queue with the Keys and Values from the source iterator. When seek() is called on this iterator, it pauses the background thread,
- * clears the queue, calls seek() on the source iterator, then resumes the thread filling the queue.
- *
- * Users of this iterator can set the queue size, default is five elements. Users must be aware of the potential for OutOfMemory errors when using this iterator
- * with large queue sizes or large objects. This iterator copies the Key and Value from the source iterator and puts them into the queue.
- *
- * This iterator introduces some parallelism into the server side iterator stack. One use case for this would be when an iterator takes a relatively long time
- * to process each K,V pair and causes the iterators above it to wait. By putting the longer running iterator in a background thread we should be able to
- * achieve greater throughput.
- *
- * NOTE: Experimental!
- *
- */
-public class ReadAheadIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
-
- private static Logger log = Logger.getLogger(ReadAheadIterator.class);
-
- public static final String QUEUE_SIZE = "queue.size";
-
- public static final String TIMEOUT = "timeout";
-
- private static final QueueElement noMoreDataElement = new QueueElement();
-
- private int queueSize = 5;
-
- private int timeout = 60;
-
- /**
- *
- * Class to hold key and value from the producing thread.
- *
- */
- static class QueueElement {
- Key key = null;
- Value value = null;
-
- public QueueElement() {}
-
- public QueueElement(Key key, Value value) {
- super();
- this.key = new Key(key);
- this.value = new Value(value.get(), true);
- }
-
- public Key getKey() {
- return key;
- }
-
- public Value getValue() {
- return value;
- }
- }
-
- /**
- *
- * Thread that produces data from the source iterator and places the results in a queue.
- *
- */
- class ProducerThread extends ReentrantLock implements Runnable {
-
- private static final long serialVersionUID = 1L;
-
- private Exception e = null;
-
- private int waitTime = timeout;
-
- private SortedKeyValueIterator<Key,Value> sourceIter = null;
-
- public ProducerThread(SortedKeyValueIterator<Key,Value> source) {
- this.sourceIter = source;
- }
-
- public void run() {
- boolean hasMoreData = true;
- // Keep this thread running while there is more data to read
- // and items left in the queue to be read off.
- while (hasMoreData || queue.size() > 0) {
- try {
- // Acquire the lock, this will wait if the lock is being
- // held by the ReadAheadIterator.seek() method.
- this.lock();
- // Check to see if there is more data from the iterator below.
- hasMoreData = sourceIter.hasTop();
- // Break out of the loop if no more data.
- if (!hasMoreData)
- continue;
- // Put the next K,V onto the queue.
- try {
- QueueElement e = new QueueElement(sourceIter.getTopKey(), sourceIter.getTopValue());
- boolean inserted = false;
- try {
- inserted = queue.offer(e, this.waitTime, TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- this.e = ie;
- break;
- }
- if (!inserted) {
- // Then we either got a timeout, set the error and break out of the loop
- this.e = new TimeoutException("Background thread has exceeded wait time of " + this.waitTime + " seconds, aborting...");
- break;
- }
- // Move the iterator to the next K,V for the next iteration of this loop
- sourceIter.next();
- } catch (Exception e) {
- this.e = e;
- log.error("Error calling next on source iterator", e);
- break;
- }
- } finally {
- this.unlock();
- }
- }
- // If we broke out of the loop because of an error, then don't put the marker on the queue, just to do end.
- if (!hasError()) {
- // Put the special end of data marker into the queue
- try {
- queue.put(noMoreDataElement);
- } catch (InterruptedException e) {
- this.e = e;
- log.error("Error putting End of Data marker onto queue");
- }
- }
- }
-
- public boolean hasError() {
- return (this.e != null);
- }
-
- public Exception getError() {
- return this.e;
- }
- }
-
- private SortedKeyValueIterator<Key,Value> source;
- private ArrayBlockingQueue<QueueElement> queue = null;
- private QueueElement currentElement = new QueueElement();
- private ProducerThread thread = null;
- private Thread t = null;
-
- protected ReadAheadIterator(ReadAheadIterator other, IteratorEnvironment env) {
- source = other.source.deepCopy(env);
- }
-
- public ReadAheadIterator() {}
-
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- return new ReadAheadIterator(this, env);
- }
-
- public Key getTopKey() {
- return currentElement.getKey();
- }
-
- public Value getTopValue() {
- return currentElement.getValue();
- }
-
- public boolean hasTop() {
- if (currentElement == noMoreDataElement)
- return false;
- return currentElement != null || queue.size() > 0 || source.hasTop();
- }
-
- public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- validateOptions(options);
- this.source = source;
- queue = new ArrayBlockingQueue<QueueElement>(queueSize);
- thread = new ProducerThread(this.source);
- t = new Thread(thread, "ReadAheadIterator-SourceThread");
- t.start();
- }
-
- /**
- * Populate the key and value
- */
- public void next() throws IOException {
- // Thread startup race condition, need to make sure that the
- // thread has started before we call this the first time.
- while (t.getState().equals(State.NEW)) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {}
- }
-
- if (t.getState().equals(State.TERMINATED)) {
- // Thread encountered an error.
- if (thread.hasError()) {
- // and it should
- throw new IOException("Background thread has died", thread.getError());
- }
- }
-
- // Pull an element off the queue, this will wait if there is no data yet.
- try {
- if (thread.hasError())
- throw new IOException("background thread has error", thread.getError());
-
- QueueElement nextElement = null;
- while (null == nextElement) {
- try {
- nextElement = queue.poll(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- // TODO: Do we need to do anything here?
- }
- if (null == nextElement) {
- // Then we have no data and timed out, check for error condition in the read ahead thread
- if (thread.hasError()) {
- throw new IOException("background thread has error", thread.getError());
- }
- }
- }
- currentElement = nextElement;
- } catch (IOException e) {
- throw new IOException("Error getting element from source iterator", e);
- }
- }
-
- /**
- * Seek to the next matching cell and call next to populate the key and value.
- */
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- if (t.isAlive()) {
- // Check for error
- if (thread.hasError())
- throw new IOException("background thread has error", thread.getError());
-
- try {
- // Acquire the lock, or wait until its unlocked by the producer thread.
- thread.lock();
- queue.clear();
- currentElement = null;
- source.seek(range, columnFamilies, inclusive);
- } finally {
- thread.unlock();
- }
- next();
- } else {
- throw new IOException("source iterator thread has died.");
- }
- }
-
- public IteratorOptions describeOptions() {
- Map<String,String> options = new HashMap<String,String>();
- options.put(QUEUE_SIZE, "read ahead queue size");
- options.put(TIMEOUT, "timeout in seconds before background thread thinks that the client has aborted");
- return new IteratorOptions(getClass().getSimpleName(), "Iterator that puts the source in another thread", options, null);
- }
-
- public boolean validateOptions(Map<String,String> options) {
- if (options.containsKey(QUEUE_SIZE))
- queueSize = Integer.parseInt(options.get(QUEUE_SIZE));
- if (options.containsKey(TIMEOUT))
- timeout = Integer.parseInt(options.get(TIMEOUT));
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/UniqFieldNameValueIterator.java
----------------------------------------------------------------------
diff --git a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/UniqFieldNameValueIterator.java b/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/UniqFieldNameValueIterator.java
deleted file mode 100644
index 4c7201d..0000000
--- a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/UniqFieldNameValueIterator.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.wikisearch.iterator;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.WrappingIterator;
-import org.apache.accumulo.examples.wikisearch.util.FieldIndexKeyParser;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-
-public class UniqFieldNameValueIterator extends WrappingIterator {
-
- protected static final Logger log = Logger.getLogger(UniqFieldNameValueIterator.class);
- // Wrapping iterator only accesses its private source in setSource and getSource
- // Since this class overrides these methods, it's safest to keep the source declaration here
- private SortedKeyValueIterator<Key,Value> source;
- private FieldIndexKeyParser keyParser;
- private Key topKey = null;
- private Value topValue = null;
- private Range overallRange = null;
- private Range currentSubRange;
- private Text fieldName = null;
- private Text fieldValueLowerBound = null;
- private Text fieldValueUpperBound = null;
- private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
- private static final String ONE_BYTE = "\1";
- private boolean multiRow = false;
- private boolean seekInclusive = false;
-
- // -------------------------------------------------------------------------
- // ------------- Static Methods
- public static void setLogLevel(Level l) {
- log.setLevel(l);
- }
-
- // -------------------------------------------------------------------------
- // ------------- Constructors
- public UniqFieldNameValueIterator(Text fName, Text fValLower, Text fValUpper) {
- this.fieldName = fName;
- this.fieldValueLowerBound = fValLower;
- this.fieldValueUpperBound = fValUpper;
- keyParser = createDefaultKeyParser();
-
- }
-
- public UniqFieldNameValueIterator(UniqFieldNameValueIterator other, IteratorEnvironment env) {
- source = other.getSource().deepCopy(env);
- // Set a default KeyParser
- keyParser = createDefaultKeyParser();
- }
-
- // -------------------------------------------------------------------------
- // ------------- Overrides
- @Override
- public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
- source = super.getSource();
- }
-
- @Override
- protected void setSource(SortedKeyValueIterator<Key,Value> source) {
- this.source = source;
- }
-
- @Override
- protected SortedKeyValueIterator<Key,Value> getSource() {
- return source;
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- return new UniqFieldNameValueIterator(this, env);
- }
-
- @Override
- public Key getTopKey() {
- return this.topKey;
- }
-
- @Override
- public Value getTopValue() {
- return this.topValue;
- }
-
- @Override
- public boolean hasTop() {
- return (topKey != null);
- }
-
- @Override
- public void next() throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("next()");
- }
- if (!source.hasTop()) {
- topKey = null;
- topValue = null;
- return;
- }
-
- Key currentKey = topKey;
- keyParser.parse(topKey);
- String fValue = keyParser.getFieldValue();
-
- Text currentRow = currentKey.getRow();
- Text currentFam = currentKey.getColumnFamily();
-
- if (overallRange.getEndKey() != null && overallRange.getEndKey().getRow().compareTo(currentRow) < 0) {
- if (log.isDebugEnabled()) {
- log.debug("next, overall endRow: " + overallRange.getEndKey().getRow() + " currentRow: " + currentRow);
- }
- topKey = null;
- topValue = null;
- return;
- }
-
- if (fValue.compareTo(this.fieldValueUpperBound.toString()) > 0) {
- topKey = null;
- topValue = null;
- return;
- }
- Key followingKey = new Key(currentKey.getRow(), this.fieldName, new Text(fValue + ONE_BYTE));
- if (log.isDebugEnabled()) {
- log.debug("next, followingKey to seek on: " + followingKey);
- }
- Range r = new Range(followingKey, followingKey);
- source.seek(r, EMPTY_COL_FAMS, false);
- while (true) {
- if (!source.hasTop()) {
- topKey = null;
- topValue = null;
- return;
- }
-
- Key k = source.getTopKey();
- if (!overallRange.contains(k)) {
- topKey = null;
- topValue = null;
- return;
- }
- if (log.isDebugEnabled()) {
- log.debug("next(), key: " + k + " subrange: " + this.currentSubRange);
- }
- // if (this.currentSubRange.contains(k)) {
- keyParser.parse(k);
- Text currentVal = new Text(keyParser.getFieldValue());
- if (k.getRow().equals(currentRow) && k.getColumnFamily().equals(currentFam) && currentVal.compareTo(fieldValueUpperBound) <= 0) {
- topKey = k;
- topValue = source.getTopValue();
- return;
-
- } else { // need to move to next row.
- if (this.overallRange.contains(k) && this.multiRow) {
- // need to find the next sub range
- // STEPS
- // 1. check if you moved past your current row on last call to next
- // 2. figure out next row
- // 3. build new start key with lowerbound fvalue
- // 4. seek the source
- // 5. test the subrange.
- if (k.getRow().equals(currentRow)) {
- // get next row
- currentRow = getNextRow();
- if (currentRow == null) {
- topKey = null;
- topValue = null;
- return;
- }
- } else {
- // i'm already in the next row
- currentRow = source.getTopKey().getRow();
- }
-
- // build new startKey
- Key sKey = new Key(currentRow, fieldName, fieldValueLowerBound);
- Key eKey = new Key(currentRow, fieldName, fieldValueUpperBound);
- currentSubRange = new Range(sKey, eKey);
- source.seek(currentSubRange, EMPTY_COL_FAMS, seekInclusive);
-
- } else { // not multi-row or outside overall range, we're done
- topKey = null;
- topValue = null;
- return;
- }
- }
-
- }
-
- }
-
- @Override
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("seek, range: " + range);
- }
- this.overallRange = range;
- this.seekInclusive = inclusive;
- source.seek(range, EMPTY_COL_FAMS, inclusive);
- topKey = null;
- topValue = null;
- Key sKey;
- Key eKey;
-
- if (range.isInfiniteStartKey()) {
- sKey = source.getTopKey();
- if (sKey == null) {
- return;
- }
- } else {
- sKey = range.getStartKey();
- }
-
- if (range.isInfiniteStopKey()) {
- eKey = null;
- this.multiRow = true; // assume we will go to the end of the tablet.
- } else {
- eKey = range.getEndKey();
- if (sKey.getRow().equals(eKey.getRow())) {
- this.multiRow = false;
- } else {
- this.multiRow = true;
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("seek, multiRow:" + multiRow + " range:" + range);
- }
-
- /*
- * NOTE: If the seek range spans multiple rows, we are only interested in the fieldName:fieldValue subranges in each row. Keys will exist in the
- * overallRange that we will want to skip over so we need to create subranges per row so we don't have to examine every key in between.
- */
-
- Text sRow = sKey.getRow();
- Key ssKey = new Key(sRow, this.fieldName, this.fieldValueLowerBound);
- Key eeKey = new Key(sRow, this.fieldName, this.fieldValueUpperBound);
- this.currentSubRange = new Range(ssKey, eeKey);
-
- if (log.isDebugEnabled()) {
- log.debug("seek, currentSubRange: " + currentSubRange);
- }
- source.seek(this.currentSubRange, columnFamilies, inclusive);
- // cycle until we find a valid topKey, or we get ejected b/c we hit the
- // end of the tablet or exceeded the overallRange.
- while (topKey == null) {
- if (source.hasTop()) {
- Key k = source.getTopKey();
- if (log.isDebugEnabled()) {
- log.debug("seek, source.topKey: " + k);
- }
- if (currentSubRange.contains(k)) {
- topKey = k;
- topValue = source.getTopValue();
-
- if (log.isDebugEnabled()) {
- log.debug("seek, source has top in valid range");
- }
-
- } else { // outside of subRange.
- // if multiRow mode, get the next row and seek to it
- if (multiRow && overallRange.contains(k)) {
-
- Key fKey = sKey.followingKey(PartialKey.ROW);
- Range fRange = new Range(fKey, eKey);
- source.seek(fRange, columnFamilies, inclusive);
-
- if (source.hasTop()) {
- Text row = source.getTopKey().getRow();
- Key nKey = new Key(row, this.fieldName, this.fieldValueLowerBound);
- this.currentSubRange = new Range(nKey, eKey);
- sKey = this.currentSubRange.getStartKey();
- Range nextRange = new Range(sKey, eKey);
- source.seek(nextRange, columnFamilies, inclusive);
- } else {
- topKey = null;
- topValue = null;
- return;
- }
-
- } else { // not multi row & outside range, we're done.
- topKey = null;
- topValue = null;
- return;
- }
- }
- } else { // source does not have top, we're done
- topKey = null;
- topValue = null;
- return;
- }
- }
-
- }
-
- // -------------------------------------------------------------------------
- // ------------- Internal Methods
- private FieldIndexKeyParser createDefaultKeyParser() {
- FieldIndexKeyParser parser = new FieldIndexKeyParser();
- return parser;
- }
-
- private Text getNextRow() throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("getNextRow()");
- }
- Key fakeKey = new Key(source.getTopKey().followingKey(PartialKey.ROW));
- Range fakeRange = new Range(fakeKey, fakeKey);
- source.seek(fakeRange, EMPTY_COL_FAMS, false);
- if (source.hasTop()) {
- return source.getTopKey().getRow();
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/jexl/Arithmetic.java
----------------------------------------------------------------------
diff --git a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/jexl/Arithmetic.java b/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/jexl/Arithmetic.java
deleted file mode 100644
index c59f573..0000000
--- a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/jexl/Arithmetic.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.wikisearch.jexl;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.jexl2.JexlArithmetic;
-import org.apache.commons.lang.math.NumberUtils;
-
-public class Arithmetic extends JexlArithmetic {
-
- public Arithmetic(boolean lenient) {
- super(lenient);
- }
-
- /**
- * This method differs from the parent in that we are not calling String.matches() because it does not match on a newline. Instead we are handling this case.
- *
- * @param left
- * first value
- * @param right
- * second value
- * @return test result.
- */
- @Override
- public boolean matches(Object left, Object right) {
- if (left == null && right == null) {
- // if both are null L == R
- return true;
- }
- if (left == null || right == null) {
- // we know both aren't null, therefore L != R
- return false;
- }
- final String arg = left.toString();
- if (right instanceof java.util.regex.Pattern) {
- return ((java.util.regex.Pattern) right).matcher(arg).matches();
- } else {
- // return arg.matches(right.toString());
- Pattern p = Pattern.compile(right.toString(), Pattern.DOTALL);
- Matcher m = p.matcher(arg);
- return m.matches();
-
- }
- }
-
- /**
- * This method differs from the parent class in that we are going to try and do a better job of coercing the types. As a last resort we will do a string
- * comparison and try not to throw a NumberFormatException. The JexlArithmetic class performs coercion to a particular type if either the left or the right
- * match a known type. We will look at the type of the right operator and try to make the left of the same type.
- */
- @Override
- public boolean equals(Object left, Object right) {
- Object fixedLeft = fixLeft(left, right);
- return super.equals(fixedLeft, right);
- }
-
- @Override
- public boolean lessThan(Object left, Object right) {
- Object fixedLeft = fixLeft(left, right);
- return super.lessThan(fixedLeft, right);
- }
-
- protected Object fixLeft(Object left, Object right) {
-
- if (null == left || null == right)
- return left;
-
- if (!(right instanceof Number) && left instanceof Number) {
- right = NumberUtils.createNumber(right.toString());
- }
-
- if (right instanceof Number && left instanceof Number) {
- if (right instanceof Double)
- return ((Double) right).doubleValue();
- else if (right instanceof Float)
- return ((Float) right).floatValue();
- else if (right instanceof Long)
- return ((Long) right).longValue();
- else if (right instanceof Integer)
- return ((Integer) right).intValue();
- else if (right instanceof Short)
- return ((Short) right).shortValue();
- else if (right instanceof Byte)
- return ((Byte) right).byteValue();
- else
- return right;
- }
- if (right instanceof Number && left instanceof String) {
- Number num = NumberUtils.createNumber(left.toString());
- // Let's try to cast left as right's type.
- if (this.isFloatingPointNumber(right) && this.isFloatingPointNumber(left))
- return num;
- else if (this.isFloatingPointNumber(right))
- return num.doubleValue();
- else if (right instanceof Number)
- return num.longValue();
- } else if (right instanceof Boolean && left instanceof String) {
- if (left.equals("true") || left.equals("false"))
- return Boolean.parseBoolean(left.toString());
-
- Number num = NumberUtils.createNumber(left.toString());
- if (num.intValue() == 1)
- return (Boolean) true;
- else if (num.intValue() == 0)
- return (Boolean) false;
- }
- return left;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/AbstractQueryLogic.java
----------------------------------------------------------------------
diff --git a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/AbstractQueryLogic.java b/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/AbstractQueryLogic.java
deleted file mode 100644
index 5c7c20c..0000000
--- a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/AbstractQueryLogic.java
+++ /dev/null
@@ -1,883 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.wikisearch.logic;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.RegExFilter;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
-import org.apache.accumulo.examples.wikisearch.iterator.BooleanLogicIterator;
-import org.apache.accumulo.examples.wikisearch.iterator.EvaluatingIterator;
-import org.apache.accumulo.examples.wikisearch.iterator.OptimizedQueryIterator;
-import org.apache.accumulo.examples.wikisearch.iterator.ReadAheadIterator;
-import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
-import org.apache.accumulo.examples.wikisearch.normalizer.Normalizer;
-import org.apache.accumulo.examples.wikisearch.parser.EventFields;
-import org.apache.accumulo.examples.wikisearch.parser.EventFields.FieldValue;
-import org.apache.accumulo.examples.wikisearch.parser.FieldIndexQueryReWriter;
-import org.apache.accumulo.examples.wikisearch.parser.JexlOperatorConstants;
-import org.apache.accumulo.examples.wikisearch.parser.QueryParser;
-import org.apache.accumulo.examples.wikisearch.parser.QueryParser.QueryTerm;
-import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator;
-import org.apache.accumulo.examples.wikisearch.sample.Document;
-import org.apache.accumulo.examples.wikisearch.sample.Field;
-import org.apache.accumulo.examples.wikisearch.sample.Results;
-import org.apache.commons.jexl2.parser.ParserTreeConstants;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * <pre>
- * <h2>Overview</h2>
- * Query implementation that works with the JEXL grammar. This
- * uses the metadata, global index, and partitioned table to return
- * results based on the query. Example queries:
- *
- * <b>Single Term Query</b>
- * 'foo' - looks in global index for foo, and if any entries are found, then the query
- * is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed
- * down the optimized query path which uses the intersecting iterators on the partitioned
- * table.
- *
- * <b>Boolean expression</b>
- * field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number),
- * the query is parsed and the set of eventFields in the query that are indexed is determined by
- * querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the
- * eventFields that are indexed, the query may be sent down the optimized path or the full scan path.
- *
- * We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators:
- *
- * ==, !=, >, ≥, <, ≤, =~, and !~
- *
- * Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction
- * with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An
- * example using this function is : "f:between(LATITUDE,60.0, 70.0)"
- *
- * <h2>Constraints on Query Structure</h2>
- * Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are
- * rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. If
- * an error occurs in the evaluation we are skipping the event.
- *
- * <h2>Notes on Optimization</h2>
- * Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table:
- *
- * 1. An 'or' conjunction exists in the query but not all of the terms are indexed.
- * 2. No indexed terms exist in the query
- * 3. An unsupported operator exists in the query
- *
- * </pre>
- *
- */
-public abstract class AbstractQueryLogic {
-
- protected static Logger log = Logger.getLogger(AbstractQueryLogic.class);
-
- /**
- * Set of datatypes to limit the query to.
- */
- public static final String DATATYPE_FILTER_SET = "datatype.filter.set";
-
- private static class DoNotPerformOptimizedQueryException extends Exception {
- private static final long serialVersionUID = 1L;
- }
-
- /**
- * Object that is used to hold ranges found in the index. Subclasses may compute the final range set in various ways.
- */
- public static abstract class IndexRanges {
-
- private Map<String,String> indexValuesToOriginalValues = null;
- private Multimap<String,String> fieldNamesAndValues = HashMultimap.create();
- private Map<String,Long> termCardinality = new HashMap<String,Long>();
- protected Map<String,TreeSet<Range>> ranges = new HashMap<String,TreeSet<Range>>();
-
- public Multimap<String,String> getFieldNamesAndValues() {
- return fieldNamesAndValues;
- }
-
- public void setFieldNamesAndValues(Multimap<String,String> fieldNamesAndValues) {
- this.fieldNamesAndValues = fieldNamesAndValues;
- }
-
- public final Map<String,Long> getTermCardinality() {
- return termCardinality;
- }
-
- public Map<String,String> getIndexValuesToOriginalValues() {
- return indexValuesToOriginalValues;
- }
-
- public void setIndexValuesToOriginalValues(Map<String,String> indexValuesToOriginalValues) {
- this.indexValuesToOriginalValues = indexValuesToOriginalValues;
- }
-
- public abstract void add(String term, Range r);
-
- public abstract Set<Range> getRanges();
- }
-
- /**
- * Object that computes the ranges by unioning all of the ranges for all of the terms together. In the case where ranges overlap, the largest range is used.
- */
- public static class UnionIndexRanges extends IndexRanges {
-
- public static String DEFAULT_KEY = "default";
-
- public UnionIndexRanges() {
- this.ranges.put(DEFAULT_KEY, new TreeSet<Range>());
- }
-
- public Set<Range> getRanges() {
- // So the set of ranges is ordered. It *should* be the case that
- // ranges with partition ids will sort before ranges that point to
- // a specific event. Populate a new set of ranges but don't add a
- // range for an event where that range is contained in a range already
- // added.
- Set<Text> shardsAdded = new HashSet<Text>();
- Set<Range> returnSet = new HashSet<Range>();
- for (Range r : ranges.get(DEFAULT_KEY)) {
- if (!shardsAdded.contains(r.getStartKey().getRow())) {
- // Only add ranges with a start key for the entire partition.
- if (r.getStartKey().getColumnFamily() == null) {
- shardsAdded.add(r.getStartKey().getRow());
- }
- returnSet.add(r);
- } else {
- // if (log.isTraceEnabled())
- log.info("Skipping event specific range: " + r.toString() + " because range has already been added: "
- + shardsAdded.contains(r.getStartKey().getRow()));
- }
- }
- return returnSet;
- }
-
- public void add(String term, Range r) {
- ranges.get(DEFAULT_KEY).add(r);
- }
- }
-
- private String metadataTableName;
- private String indexTableName;
- private String reverseIndexTableName;
- private String tableName;
- private int queryThreads = 8;
- private String readAheadQueueSize;
- private String readAheadTimeOut;
- private boolean useReadAheadIterator;
- private Kryo kryo = new Kryo();
- private EventFields eventFields = new EventFields();
- private List<String> unevaluatedFields = null;
- private Map<Class<? extends Normalizer>,Normalizer> normalizerCacheMap = new HashMap<Class<? extends Normalizer>,Normalizer>();
- private static final String NULL_BYTE = "\u0000";
-
- public AbstractQueryLogic() {
- super();
- EventFields.initializeKryo(kryo);
- }
-
- /**
- * Queries metadata table to determine which terms are indexed.
- *
- * @param c
- * @param auths
- * @param queryLiterals
- * @param datatypes
- * - optional list of types
- * @return map of indexed field names to types to normalizers used in this date range
- * @throws TableNotFoundException
- * @throws IllegalAccessException
- * @throws InstantiationException
- */
- protected Map<String,Multimap<String,Class<? extends Normalizer>>> findIndexedTerms(Connector c, Authorizations auths, Set<String> queryLiterals,
- Set<String> datatypes) throws TableNotFoundException, InstantiationException, IllegalAccessException {
-
- Map<String,Multimap<String,Class<? extends Normalizer>>> results = new HashMap<String,Multimap<String,Class<? extends Normalizer>>>();
-
- for (String literal : queryLiterals) {
- if (log.isDebugEnabled())
- log.debug("Querying " + this.getMetadataTableName() + " table for " + literal);
- Range range = new Range(literal.toUpperCase());
- Scanner scanner = c.createScanner(this.getMetadataTableName(), auths);
- scanner.setRange(range);
- scanner.fetchColumnFamily(new Text(WikipediaMapper.METADATA_INDEX_COLUMN_FAMILY));
- for (Entry<Key,Value> entry : scanner) {
- if (!results.containsKey(literal)) {
- Multimap<String,Class<? extends Normalizer>> m = HashMultimap.create();
- results.put(literal, m);
- }
- // Get the column qualifier from the key. It contains the datatype and normalizer class
- String colq = entry.getKey().getColumnQualifier().toString();
- if (null != colq && colq.contains("\0")) {
- int idx = colq.indexOf("\0");
- if (idx != -1) {
- String type = colq.substring(0, idx);
- // If types are specified and this type is not in the list then skip it.
- if (null != datatypes && !datatypes.contains(type))
- continue;
- try {
- @SuppressWarnings("unchecked")
- Class<? extends Normalizer> clazz = (Class<? extends Normalizer>) Class.forName(colq.substring(idx + 1));
- if (!normalizerCacheMap.containsKey(clazz))
- normalizerCacheMap.put(clazz, clazz.newInstance());
- results.get(literal).put(type, clazz);
- } catch (ClassNotFoundException e) {
- log.error("Unable to find normalizer on class path: " + colq.substring(idx + 1), e);
- results.get(literal).put(type, LcNoDiacriticsNormalizer.class);
- }
- } else {
- log.warn("EventMetadata entry did not contain NULL byte: " + entry.getKey().toString());
- }
- } else {
- log.warn("ColumnQualifier null in EventMetadata for key: " + entry.getKey().toString());
- }
- }
- }
- if (log.isDebugEnabled())
- log.debug("METADATA RESULTS: " + results.toString());
- return results;
- }
-
- /**
- * Performs a lookup in the global index for a single non-fielded term.
- *
- * @param c
- * @param auths
- * @param value
- * @param datatypes
- * - optional list of types
- * @return ranges that fit into the date range.
- */
- protected abstract IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> datatypes) throws TableNotFoundException;
-
- /**
- * Performs a lookup in the global index / reverse index and returns a RangeCalculator
- *
- * @param c
- * Accumulo connection
- * @param auths
- * authset for queries
- * @param indexedTerms
- * multimap of indexed field name and Normalizers used
- * @param terms
- * multimap of field name and QueryTerm object
- * @param indexTableName
- * @param reverseIndexTableName
- * @param queryString
- * original query string
- * @param queryThreads
- * @param datatypes
- * - optional list of types
- * @return range calculator
- * @throws TableNotFoundException
- */
- protected abstract RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String,Normalizer> indexedTerms,
- Multimap<String,QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> datatypes)
- throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException;
-
- protected abstract Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String,QueryTerm> terms);
-
- public String getMetadataTableName() {
- return metadataTableName;
- }
-
- public String getIndexTableName() {
- return indexTableName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setMetadataTableName(String metadataTableName) {
- this.metadataTableName = metadataTableName;
- }
-
- public void setIndexTableName(String indexTableName) {
- this.indexTableName = indexTableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public int getQueryThreads() {
- return queryThreads;
- }
-
- public void setQueryThreads(int queryThreads) {
- this.queryThreads = queryThreads;
- }
-
- public String getReadAheadQueueSize() {
- return readAheadQueueSize;
- }
-
- public String getReadAheadTimeOut() {
- return readAheadTimeOut;
- }
-
- public boolean isUseReadAheadIterator() {
- return useReadAheadIterator;
- }
-
- public void setReadAheadQueueSize(String readAheadQueueSize) {
- this.readAheadQueueSize = readAheadQueueSize;
- }
-
- public void setReadAheadTimeOut(String readAheadTimeOut) {
- this.readAheadTimeOut = readAheadTimeOut;
- }
-
- public void setUseReadAheadIterator(boolean useReadAheadIterator) {
- this.useReadAheadIterator = useReadAheadIterator;
- }
-
- public String getReverseIndexTableName() {
- return reverseIndexTableName;
- }
-
- public void setReverseIndexTableName(String reverseIndexTableName) {
- this.reverseIndexTableName = reverseIndexTableName;
- }
-
- public List<String> getUnevaluatedFields() {
- return unevaluatedFields;
- }
-
- public void setUnevaluatedFields(List<String> unevaluatedFields) {
- this.unevaluatedFields = unevaluatedFields;
- }
-
- public void setUnevaluatedFields(String unevaluatedFieldList) {
- this.unevaluatedFields = new ArrayList<String>();
- for (String field : unevaluatedFieldList.split(","))
- this.unevaluatedFields.add(field);
- }
-
- public Document createDocument(Key key, Value value) {
- Document doc = new Document();
-
- eventFields.clear();
- ByteBuffer buf = ByteBuffer.wrap(value.get());
- eventFields.readObjectData(kryo, buf);
-
- // Set the id to the document id which is located in the colf
- String row = key.getRow().toString();
- String colf = key.getColumnFamily().toString();
- int idx = colf.indexOf(NULL_BYTE);
- String type = colf.substring(0, idx);
- String id = colf.substring(idx + 1);
- doc.setId(id);
- for (Entry<String,Collection<FieldValue>> entry : eventFields.asMap().entrySet()) {
- for (FieldValue fv : entry.getValue()) {
- Field val = new Field();
- val.setFieldName(entry.getKey());
- val.setFieldValue(new String(fv.getValue(), Charset.forName("UTF-8")));
- doc.getFields().add(val);
- }
- }
-
- // Add the pointer for the content.
- Field docPointer = new Field();
- docPointer.setFieldName("DOCUMENT");
- docPointer.setFieldValue("DOCUMENT:" + row + "/" + type + "/" + id);
- doc.getFields().add(docPointer);
-
- return doc;
- }
-
- public String getResultsKey(Entry<Key,Value> key) {
- // Use the colf from the table, it contains the uuid and datatype
- return key.getKey().getColumnFamily().toString();
- }
-
- public Results runQuery(Connector connector, List<String> authorizations, String query, Date beginDate, Date endDate, Set<String> types) {
-
- if (StringUtils.isEmpty(query)) {
- throw new IllegalArgumentException("NULL QueryNode reference passed to " + this.getClass().getSimpleName());
- }
-
- Set<Range> ranges = new HashSet<Range>();
- Set<String> typeFilter = types;
- String array[] = authorizations.toArray(new String[0]);
- Authorizations auths = new Authorizations(array);
- Results results = new Results();
-
- // Get the query string
- String queryString = query;
-
- StopWatch abstractQueryLogic = new StopWatch();
- StopWatch optimizedQuery = new StopWatch();
- StopWatch queryGlobalIndex = new StopWatch();
- StopWatch optimizedEventQuery = new StopWatch();
- StopWatch fullScanQuery = new StopWatch();
- StopWatch processResults = new StopWatch();
-
- abstractQueryLogic.start();
-
- StopWatch parseQuery = new StopWatch();
- parseQuery.start();
-
- QueryParser parser;
- try {
- if (log.isDebugEnabled()) {
- log.debug("ShardQueryLogic calling QueryParser.execute");
- }
- parser = new QueryParser();
- parser.execute(queryString);
- } catch (org.apache.commons.jexl2.parser.ParseException e1) {
- throw new IllegalArgumentException("Error parsing query", e1);
- }
- int hash = parser.getHashValue();
- parseQuery.stop();
- if (log.isDebugEnabled()) {
- log.debug(hash + " Query: " + queryString);
- }
-
- Set<String> fields = new HashSet<String>();
- for (String f : parser.getQueryIdentifiers()) {
- fields.add(f);
- }
- if (log.isDebugEnabled()) {
- log.debug("getQueryIdentifiers: " + parser.getQueryIdentifiers().toString());
- }
- // Remove any negated fields from the fields list, we don't want to lookup negated fields
- // in the index.
- fields.removeAll(parser.getNegatedTermsForOptimizer());
-
- if (log.isDebugEnabled()) {
- log.debug("getQueryIdentifiers: " + parser.getQueryIdentifiers().toString());
- }
- // Get the mapping of field name to QueryTerm object from the query. The query term object
- // contains the operator, whether its negated or not, and the literal to test against.
- Multimap<String,QueryTerm> terms = parser.getQueryTerms();
-
- // Find out which terms are indexed
- // TODO: Should we cache indexed terms or does that not make sense since we are always
- // loading data.
- StopWatch queryMetadata = new StopWatch();
- queryMetadata.start();
- Map<String,Multimap<String,Class<? extends Normalizer>>> metadataResults;
- try {
- metadataResults = findIndexedTerms(connector, auths, fields, typeFilter);
- } catch (Exception e1) {
- throw new RuntimeException("Error in metadata lookup", e1);
- }
-
- // Create a map of indexed term to set of normalizers for it
- Multimap<String,Normalizer> indexedTerms = HashMultimap.create();
- for (Entry<String,Multimap<String,Class<? extends Normalizer>>> entry : metadataResults.entrySet()) {
- // Get the normalizer from the normalizer cache
- for (Class<? extends Normalizer> clazz : entry.getValue().values()) {
- indexedTerms.put(entry.getKey(), normalizerCacheMap.get(clazz));
- }
- }
- queryMetadata.stop();
- if (log.isDebugEnabled()) {
- log.debug(hash + " Indexed Terms: " + indexedTerms.toString());
- }
-
- Set<String> orTerms = parser.getOrTermsForOptimizer();
-
- // Iterate over the query terms to get the operators specified in the query.
- ArrayList<String> unevaluatedExpressions = new ArrayList<String>();
- boolean unsupportedOperatorSpecified = false;
- for (Entry<String,QueryTerm> entry : terms.entries()) {
- if (null == entry.getValue()) {
- continue;
- }
-
- if (null != this.unevaluatedFields && this.unevaluatedFields.contains(entry.getKey().trim())) {
- unevaluatedExpressions.add(entry.getKey().trim() + " " + entry.getValue().getOperator() + " " + entry.getValue().getValue());
- }
-
- int operator = JexlOperatorConstants.getJJTNodeType(entry.getValue().getOperator());
- if (!(operator == ParserTreeConstants.JJTEQNODE || operator == ParserTreeConstants.JJTNENODE || operator == ParserTreeConstants.JJTLENODE
- || operator == ParserTreeConstants.JJTLTNODE || operator == ParserTreeConstants.JJTGENODE || operator == ParserTreeConstants.JJTGTNODE || operator == ParserTreeConstants.JJTERNODE)) {
- unsupportedOperatorSpecified = true;
- break;
- }
- }
- if (null != unevaluatedExpressions)
- unevaluatedExpressions.trimToSize();
- if (log.isDebugEnabled()) {
- log.debug(hash + " unsupportedOperators: " + unsupportedOperatorSpecified + " indexedTerms: " + indexedTerms.toString() + " orTerms: "
- + orTerms.toString() + " unevaluatedExpressions: " + unevaluatedExpressions.toString());
- }
-
- // We can use the intersecting iterator over the field index as an optimization under the
- // following conditions
- //
- // 1. No unsupported operators in the query.
- // 2. No 'or' operators and at least one term indexed
- // or
- // 1. No unsupported operators in the query.
- // 2. and all terms indexed
- // or
- // 1. All or'd terms are indexed. NOTE, this will potentially skip some queries and push to a full table scan
- // // WE should look into finding a better way to handle whether we do an optimized query or not.
- boolean optimizationSucceeded = false;
- boolean orsAllIndexed = false;
- if (orTerms.isEmpty()) {
- orsAllIndexed = false;
- } else {
- orsAllIndexed = indexedTerms.keySet().containsAll(orTerms);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("All or terms are indexed");
- }
-
- if (!unsupportedOperatorSpecified
- && (((null == orTerms || orTerms.isEmpty()) && indexedTerms.size() > 0) || (fields.size() > 0 && indexedTerms.size() == fields.size()) || orsAllIndexed)) {
- optimizedQuery.start();
- // Set up intersecting iterator over field index.
-
- // Get information from the global index for the indexed terms. The results object will contain the term
- // mapped to an object that contains the total count, and partitions where this term is located.
-
- // TODO: Should we cache indexed term information or does that not make sense since we are always loading data
- queryGlobalIndex.start();
- IndexRanges termIndexInfo;
- try {
- // If fields is null or zero, then it's probably the case that the user entered a value
- // to search for with no fields. Check for the value in index.
- if (fields.isEmpty()) {
- termIndexInfo = this.getTermIndexInformation(connector, auths, queryString, typeFilter);
- if (null != termIndexInfo && termIndexInfo.getRanges().isEmpty()) {
- // Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards
- // in unhandled locations.
- // Break out of here by throwing a named exception and do full scan
- throw new DoNotPerformOptimizedQueryException();
- }
- // We need to rewrite the query string here so that it's valid.
- if (termIndexInfo instanceof UnionIndexRanges) {
- UnionIndexRanges union = (UnionIndexRanges) termIndexInfo;
- StringBuilder buf = new StringBuilder();
- String sep = "";
- for (String fieldName : union.getFieldNamesAndValues().keySet()) {
- buf.append(sep).append(fieldName).append(" == ");
- if (!(queryString.startsWith("'") && queryString.endsWith("'"))) {
- buf.append("'").append(queryString).append("'");
- } else {
- buf.append(queryString);
- }
- sep = " or ";
- }
- if (log.isDebugEnabled()) {
- log.debug("Rewrote query for non-fielded single term query: " + queryString + " to " + buf.toString());
- }
- queryString = buf.toString();
- } else {
- throw new RuntimeException("Unexpected IndexRanges implementation");
- }
- } else {
- RangeCalculator calc = this.getTermIndexInformation(connector, auths, indexedTerms, terms, this.getIndexTableName(), this.getReverseIndexTableName(),
- queryString, this.queryThreads, typeFilter);
- if (null == calc.getResult() || calc.getResult().isEmpty()) {
- // Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards
- // in unhandled locations.
- // Break out of here by throwing a named exception and do full scan
- throw new DoNotPerformOptimizedQueryException();
- }
- termIndexInfo = new UnionIndexRanges();
- termIndexInfo.setIndexValuesToOriginalValues(calc.getIndexValues());
- termIndexInfo.setFieldNamesAndValues(calc.getIndexEntries());
- termIndexInfo.getTermCardinality().putAll(calc.getTermCardinalities());
- for (Range r : calc.getResult()) {
- // foo is a placeholder and is ignored.
- termIndexInfo.add("foo", r);
- }
- }
- } catch (TableNotFoundException e) {
- log.error(this.getIndexTableName() + "not found", e);
- throw new RuntimeException(this.getIndexTableName() + "not found", e);
- } catch (org.apache.commons.jexl2.parser.ParseException e) {
- throw new RuntimeException("Error determining ranges for query: " + queryString, e);
- } catch (DoNotPerformOptimizedQueryException e) {
- log.info("Indexed fields not found in index, performing full scan");
- termIndexInfo = null;
- }
- queryGlobalIndex.stop();
-
- // Determine if we should proceed with optimized query based on results from the global index
- boolean proceed = false;
- if (null == termIndexInfo || termIndexInfo.getFieldNamesAndValues().values().size() == 0) {
- proceed = false;
- } else if (null != orTerms && orTerms.size() > 0 && (termIndexInfo.getFieldNamesAndValues().values().size() == indexedTerms.size())) {
- proceed = true;
- } else if (termIndexInfo.getFieldNamesAndValues().values().size() > 0) {
- proceed = true;
- } else if (orsAllIndexed) {
- proceed = true;
- } else {
- proceed = false;
- }
- if (log.isDebugEnabled()) {
- log.debug("Proceed with optimized query: " + proceed);
- if (null != termIndexInfo)
- log.debug("termIndexInfo.getTermsFound().size(): " + termIndexInfo.getFieldNamesAndValues().values().size() + " indexedTerms.size: "
- + indexedTerms.size() + " fields.size: " + fields.size());
- }
- if (proceed) {
-
- if (log.isDebugEnabled()) {
- log.debug(hash + " Performing optimized query");
- }
- // Use the scan ranges from the GlobalIndexRanges object as the ranges for the batch scanner
- ranges = termIndexInfo.getRanges();
- if (log.isDebugEnabled()) {
- log.info(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString());
- }
-
- // Create BatchScanner, set the ranges, and setup the iterators.
- optimizedEventQuery.start();
- BatchScanner bs = null;
- try {
- bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads);
- bs.setRanges(ranges);
- IteratorSetting si = new IteratorSetting(21, "eval", OptimizedQueryIterator.class);
-
- if (log.isDebugEnabled()) {
- log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString);
- }
- // Set the query option
- si.addOption(EvaluatingIterator.QUERY_OPTION, queryString);
- // Set the Indexed Terms List option. This is the field name and normalized field value pair separated
- // by a comma.
- StringBuilder buf = new StringBuilder();
- String sep = "";
- for (Entry<String,String> entry : termIndexInfo.getFieldNamesAndValues().entries()) {
- buf.append(sep);
- buf.append(entry.getKey());
- buf.append(":");
- buf.append(termIndexInfo.getIndexValuesToOriginalValues().get(entry.getValue()));
- buf.append(":");
- buf.append(entry.getValue());
- if (sep.equals("")) {
- sep = ";";
- }
- }
- if (log.isDebugEnabled()) {
- log.debug("Setting scan option: " + FieldIndexQueryReWriter.INDEXED_TERMS_LIST + " to " + buf.toString());
- }
- FieldIndexQueryReWriter rewriter = new FieldIndexQueryReWriter();
- String q = "";
- try {
- q = queryString;
- q = rewriter.applyCaseSensitivity(q, true, false);// Set upper/lower case for fieldname/fieldvalue
- Map<String,String> opts = new HashMap<String,String>();
- opts.put(FieldIndexQueryReWriter.INDEXED_TERMS_LIST, buf.toString());
- q = rewriter.removeNonIndexedTermsAndInvalidRanges(q, opts);
- q = rewriter.applyNormalizedTerms(q, opts);
- if (log.isDebugEnabled()) {
- log.debug("runServerQuery, FieldIndex Query: " + q);
- }
- } catch (org.apache.commons.jexl2.parser.ParseException ex) {
- log.error("Could not parse query, Jexl ParseException: " + ex);
- } catch (Exception ex) {
- log.error("Problem rewriting query, Exception: " + ex.getMessage());
- }
- si.addOption(BooleanLogicIterator.FIELD_INDEX_QUERY, q);
-
- // Set the term cardinality option
- sep = "";
- buf.delete(0, buf.length());
- for (Entry<String,Long> entry : termIndexInfo.getTermCardinality().entrySet()) {
- buf.append(sep);
- buf.append(entry.getKey());
- buf.append(":");
- buf.append(entry.getValue());
- sep = ",";
- }
- if (log.isDebugEnabled())
- log.debug("Setting scan option: " + BooleanLogicIterator.TERM_CARDINALITIES + " to " + buf.toString());
- si.addOption(BooleanLogicIterator.TERM_CARDINALITIES, buf.toString());
- if (this.useReadAheadIterator) {
- if (log.isDebugEnabled()) {
- log.debug("Enabling read ahead iterator with queue size: " + this.readAheadQueueSize + " and timeout: " + this.readAheadTimeOut);
- }
- si.addOption(ReadAheadIterator.QUEUE_SIZE, this.readAheadQueueSize);
- si.addOption(ReadAheadIterator.TIMEOUT, this.readAheadTimeOut);
-
- }
-
- if (null != unevaluatedExpressions) {
- StringBuilder unevaluatedExpressionList = new StringBuilder();
- String sep2 = "";
- for (String exp : unevaluatedExpressions) {
- unevaluatedExpressionList.append(sep2).append(exp);
- sep2 = ",";
- }
- if (log.isDebugEnabled())
- log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString());
- si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString());
- }
-
- bs.addScanIterator(si);
-
- processResults.start();
- processResults.suspend();
- long count = 0;
- for (Entry<Key,Value> entry : bs) {
- count++;
- // The key that is returned by the EvaluatingIterator is not the same key that is in
- // the table. The value that is returned by the EvaluatingIterator is a kryo
- // serialized EventFields object.
- processResults.resume();
- Document d = this.createDocument(entry.getKey(), entry.getValue());
- results.getResults().add(d);
- processResults.suspend();
- }
- log.info(count + " matching entries found in optimized query.");
- optimizationSucceeded = true;
- processResults.stop();
- } catch (TableNotFoundException e) {
- log.error(this.getTableName() + "not found", e);
- throw new RuntimeException(this.getIndexTableName() + "not found", e);
- } finally {
- if (bs != null) {
- bs.close();
- }
- }
- optimizedEventQuery.stop();
- }
- optimizedQuery.stop();
- }
-
- // WE should look into finding a better way to handle whether we do an optimized query or not.
- // We are not setting up an else condition here because we may have aborted the logic early in the if statement.
- if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()) && !orsAllIndexed)) {
- // if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()))) {
- fullScanQuery.start();
- if (log.isDebugEnabled()) {
- log.debug(hash + " Performing full scan query");
- }
-
- // Set up a full scan using the date ranges from the query
- // Create BatchScanner, set the ranges, and setup the iterators.
- BatchScanner bs = null;
- try {
- // The ranges are the start and end dates
- Collection<Range> r = getFullScanRange(beginDate, endDate, terms);
- ranges.addAll(r);
-
- if (log.isDebugEnabled()) {
- log.debug(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString());
- }
-
- bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads);
- bs.setRanges(ranges);
- IteratorSetting si = new IteratorSetting(22, "eval", EvaluatingIterator.class);
- // Create datatype regex if needed
- if (null != typeFilter) {
- StringBuilder buf = new StringBuilder();
- String s = "";
- for (String type : typeFilter) {
- buf.append(s).append(type).append(".*");
- s = "|";
- }
- if (log.isDebugEnabled())
- log.debug("Setting colf regex iterator to: " + buf.toString());
- IteratorSetting ri = new IteratorSetting(21, "typeFilter", RegExFilter.class);
- RegExFilter.setRegexs(ri, null, buf.toString(), null, null, false);
- bs.addScanIterator(ri);
- }
- if (log.isDebugEnabled()) {
- log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString);
- }
- si.addOption(EvaluatingIterator.QUERY_OPTION, queryString);
- if (null != unevaluatedExpressions) {
- StringBuilder unevaluatedExpressionList = new StringBuilder();
- String sep2 = "";
- for (String exp : unevaluatedExpressions) {
- unevaluatedExpressionList.append(sep2).append(exp);
- sep2 = ",";
- }
- if (log.isDebugEnabled())
- log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString());
- si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString());
- }
- bs.addScanIterator(si);
- long count = 0;
- processResults.start();
- processResults.suspend();
- for (Entry<Key,Value> entry : bs) {
- count++;
- // The key that is returned by the EvaluatingIterator is not the same key that is in
- // the partition table. The value that is returned by the EvaluatingIterator is a kryo
- // serialized EventFields object.
- processResults.resume();
- Document d = this.createDocument(entry.getKey(), entry.getValue());
- results.getResults().add(d);
- processResults.suspend();
- }
- processResults.stop();
- log.info(count + " matching entries found in full scan query.");
- } catch (TableNotFoundException e) {
- log.error(this.getTableName() + "not found", e);
- } finally {
- if (bs != null) {
- bs.close();
- }
- }
- fullScanQuery.stop();
- }
-
- log.info("AbstractQueryLogic: " + queryString + " " + timeString(abstractQueryLogic.getTime()));
- log.info(" 1) parse query " + timeString(parseQuery.getTime()));
- log.info(" 2) query metadata " + timeString(queryMetadata.getTime()));
- log.info(" 3) full scan query " + timeString(fullScanQuery.getTime()));
- log.info(" 3) optimized query " + timeString(optimizedQuery.getTime()));
- log.info(" 1) process results " + timeString(processResults.getTime()));
- log.info(" 1) query global index " + timeString(queryGlobalIndex.getTime()));
- log.info(hash + " Query completed.");
-
- return results;
- }
-
- private static String timeString(long millis) {
- return String.format("%4.2f", millis / 1000.);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/ContentLogic.java
----------------------------------------------------------------------
diff --git a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/ContentLogic.java b/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/ContentLogic.java
deleted file mode 100644
index 2e65a38..0000000
--- a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/ContentLogic.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.wikisearch.logic;
-
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
-import org.apache.accumulo.examples.wikisearch.sample.Document;
-import org.apache.accumulo.examples.wikisearch.sample.Field;
-import org.apache.accumulo.examples.wikisearch.sample.Results;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
-
-/**
- * This query table implementation returns a Results object that contains documents from the wiki table. The query will contain the partition id, wikitype, and
- * UID so that we can seek directly to the document. The document is stored as base64 compressed binary in the Accumulo table. We will decompress the data so
- * that it is base64 encoded binary data in the Results object.
- *
- * The query that needs to be passed to the web service is: DOCUMENT:partitionId/wikitype/uid.
- *
- */
-public class ContentLogic {
-
- private static final Logger log = Logger.getLogger(ContentLogic.class);
-
- private static final String NULL_BYTE = "\u0000";
-
- private String tableName = null;
-
- private Pattern queryPattern = Pattern.compile("^DOCUMENT:(.*)/(.*)/(.*)$");
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public Results runQuery(Connector connector, String query, List<String> authorizations) {
-
- Results results = new Results();
- Authorizations auths = new Authorizations(StringUtils.join(authorizations, "|"));
-
- Matcher match = queryPattern.matcher(query);
- if (!match.matches()) {
- throw new IllegalArgumentException("Query does not match the pattern: DOCUMENT:partitionId/wikitype/uid, your query: " + query.toString());
- } else {
- String partitionId = match.group(1);
- String wikitype = match.group(2);
- String id = match.group(3);
-
- log.debug("Received pieces: " + partitionId + ", " + wikitype + ", " + id);
-
- // Create the Range
- Key startKey = new Key(partitionId, WikipediaMapper.DOCUMENT_COLUMN_FAMILY, wikitype + NULL_BYTE + id);
- Key endKey = new Key(partitionId, WikipediaMapper.DOCUMENT_COLUMN_FAMILY, wikitype + NULL_BYTE + id + NULL_BYTE);
- Range r = new Range(startKey, true, endKey, false);
-
- log.debug("Setting range: " + r);
-
- try {
- Scanner scanner = connector.createScanner(this.getTableName(), auths);
- scanner.setRange(r);
- // This should in theory only match one thing.
- for (Entry<Key,Value> entry : scanner) {
- Document doc = new Document();
- doc.setId(id);
- Field val = new Field();
- val.setFieldName("DOCUMENT");
- val.setFieldValue(new String(Base64.decodeBase64(entry.getValue().toString())));
- doc.getFields().add(val);
- results.getResults().add(doc);
- }
- } catch (TableNotFoundException e) {
- throw new RuntimeException("Table not found: " + this.getTableName(), e);
- }
-
- }
- return results;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/QueryLogic.java
----------------------------------------------------------------------
diff --git a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/QueryLogic.java b/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/QueryLogic.java
deleted file mode 100644
index bcfeb70..0000000
--- a/src/examples/wikisearch/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/QueryLogic.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.wikisearch.logic;
-
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.examples.wikisearch.iterator.EvaluatingIterator;
-import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
-import org.apache.accumulo.examples.wikisearch.normalizer.Normalizer;
-import org.apache.accumulo.examples.wikisearch.parser.QueryParser.QueryTerm;
-import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator;
-import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
-import org.apache.accumulo.examples.wikisearch.util.TextUtil;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Multimap;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * <pre>
- * <h2>Overview</h2>
- * QueryTable implementation that works with the JEXL grammar. This QueryTable
- * uses the metadata, global index, and partitioned table to return
- * results based on the query. Example queries:
- *
- * <b>Single Term Query</b>
- * 'foo' - looks in global index for foo, and if any entries are found, then the query
- * is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed
- * down the optimized query path which uses the intersecting iterators on the shard
- * table.
- *
- * <b>Boolean expression</b>
- * field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number),
- * the query is parsed and the set of eventFields in the query that are indexed is determined by
- * querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the
- * eventFields that are indexed, the query may be sent down the optimized path or the full scan path.
- *
- * We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators:
- *
- * ==, !=, >, ≥, <, ≤, =~, and !~
- *
- * Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction
- * with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An
- * example using this function is : "f:between(LATITUDE,60.0, 70.0)"
- *
- * <h2>Constraints on Query Structure</h2>
- * Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are
- * rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. Users
- * should also be aware that the literals used in the query need to match the data in the table. If an error occurs in the evaluation
- * we are skipping the event.
- *
- * <h2>Notes on Optimization</h2>
- * Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table:
- *
- * 1. An 'or' conjunction exists in the query but not all of the terms are indexed.
- * 2. No indexed terms exist in the query
- * 3. An unsupported operator exists in the query
- *
- * </pre>
- *
- */
-public class QueryLogic extends AbstractQueryLogic {
-
- protected static Logger log = Logger.getLogger(QueryLogic.class);
-
- public QueryLogic() {
- super();
- }
-
- @Override
- protected RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String,Normalizer> indexedTerms,
- Multimap<String,QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> typeFilter)
- throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException {
- RangeCalculator calc = new RangeCalculator();
- calc.execute(c, auths, indexedTerms, terms, queryString, this, typeFilter);
- return calc;
- }
-
- protected Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String,QueryTerm> terms) {
- return Collections.singletonList(new Range());
- }
-
- @Override
- protected IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> typeFilter) throws TableNotFoundException {
- final String dummyTermName = "DUMMY";
- UnionIndexRanges indexRanges = new UnionIndexRanges();
-
- // The entries in the index are normalized, since we don't have a field, just try using the LcNoDiacriticsNormalizer.
- String normalizedFieldValue = new LcNoDiacriticsNormalizer().normalizeFieldValue("", value);
- // Remove the begin and end ' marks
- if (normalizedFieldValue.startsWith("'") && normalizedFieldValue.endsWith("'")) {
- normalizedFieldValue = normalizedFieldValue.substring(1, normalizedFieldValue.length() - 1);
- }
- Text fieldValue = new Text(normalizedFieldValue);
- if (log.isDebugEnabled()) {
- log.debug("Querying index table : " + this.getIndexTableName() + " for normalized indexed term: " + fieldValue);
- }
- Scanner scanner = c.createScanner(this.getIndexTableName(), auths);
- Range r = new Range(fieldValue);
- scanner.setRange(r);
- if (log.isDebugEnabled()) {
- log.debug("Range for index query: " + r.toString());
- }
- for (Entry<Key,Value> entry : scanner) {
- if (log.isDebugEnabled()) {
- log.debug("Index entry: " + entry.getKey().toString());
- }
- // Get the shard id and datatype from the colq
- String fieldName = entry.getKey().getColumnFamily().toString();
- String colq = entry.getKey().getColumnQualifier().toString();
- int separator = colq.indexOf(EvaluatingIterator.NULL_BYTE_STRING);
- String shardId = null;
- String datatype = null;
- if (separator != -1) {
- shardId = colq.substring(0, separator);
- datatype = colq.substring(separator + 1);
- } else {
- shardId = colq;
- }
- // Skip this entry if the type is not correct
- if (null != datatype && null != typeFilter && !typeFilter.contains(datatype))
- continue;
- // Parse the UID.List object from the value
- Uid.List uidList = null;
- try {
- uidList = Uid.List.parseFrom(entry.getValue().get());
- } catch (InvalidProtocolBufferException e) {
- // Don't add UID information, at least we know what shards
- // it is located in.
- }
-
- // Add the count for this shard to the total count for the term.
- long count = 0;
- Long storedCount = indexRanges.getTermCardinality().get(dummyTermName);
- if (null == storedCount) {
- count = uidList.getCOUNT();
- } else {
- count = uidList.getCOUNT() + storedCount;
- }
- indexRanges.getTermCardinality().put(dummyTermName, count);
- // Add the field name
- indexRanges.getFieldNamesAndValues().put(fieldName, normalizedFieldValue);
-
- // Create the keys
- Text shard = new Text(shardId);
- if (uidList.getIGNORE()) {
- // Then we create a scan range that is the entire shard
- indexRanges.add(dummyTermName, new Range(shard));
- } else {
- // We should have UUIDs, create event ranges
- for (String uuid : uidList.getUIDList()) {
- Text cf = new Text(datatype);
- TextUtil.textAppend(cf, uuid);
- Key startKey = new Key(shard, cf);
- Key endKey = new Key(shard, new Text(cf.toString() + EvaluatingIterator.NULL_BYTE_STRING));
- Range eventRange = new Range(startKey, true, endKey, false);
- indexRanges.add(dummyTermName, eventRange);
- }
- }
- }
- if (log.isDebugEnabled()) {
- log.debug("Found " + indexRanges.getRanges().size() + " entries in the index for field value: " + normalizedFieldValue);
- }
- return indexRanges;
-
- }
-
-}