You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/01/06 23:02:13 UTC
svn commit: r1228459 [9/13] - in /incubator/accumulo/branches/1.4: ./
contrib/accumulo_sample/
src/examples/src/main/java/org/apache/accumulo/examples/wikisearch/
src/trace/ src/wikisearch/ src/wikisearch/ingest/
src/wikisearch/ingest/bin/ src/wikisear...
Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/ReadAheadIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/ReadAheadIterator.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/ReadAheadIterator.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/ReadAheadIterator.java Fri Jan 6 22:02:09 2012
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.iterator;
+
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.OptionDescriber;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * This iterator takes the source iterator (the one below it in the iterator stack) and puts it in a background thread. The background thread continues
+ * processing and fills a queue with the Keys and Values from the source iterator. When seek() is called on this iterator, it pauses the background thread,
+ * clears the queue, calls seek() on the source iterator, then resumes the thread filling the queue.
+ *
+ * Users of this iterator can set the queue size, default is five elements. Users must be aware of the potential for OutOfMemory errors when using this iterator
+ * with large queue sizes or large objects. This iterator copies the Key and Value from the source iterator and puts them into the queue.
+ *
+ * This iterator introduces some parallelism into the server side iterator stack. One use case for this would be when an iterator takes a relatively long time
+ * to process each K,V pair and causes the iterators above it to wait. By putting the longer running iterator in a background thread we should be able to
+ * achieve greater throughput.
+ *
+ * NOTE: Experimental!
+ *
+ */
+public class ReadAheadIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+
+ private static Logger log = Logger.getLogger(ReadAheadIterator.class);
+
+ public static final String QUEUE_SIZE = "queue.size";
+
+ public static final String TIMEOUT = "timeout";
+
+ private static final QueueElement noMoreDataElement = new QueueElement();
+
+ private int queueSize = 5;
+
+ private int timeout = 60;
+
+ /**
+ *
+ * Class to hold key and value from the producing thread.
+ *
+ */
+ static class QueueElement {
+ Key key = null;
+ Value value = null;
+
+ public QueueElement() {}
+
+ public QueueElement(Key key, Value value) {
+ super();
+ this.key = new Key(key);
+ this.value = new Value(value.get(), true);
+ }
+
+ public Key getKey() {
+ return key;
+ }
+
+ public Value getValue() {
+ return value;
+ }
+ }
+
+ /**
+ *
+ * Thread that produces data from the source iterator and places the results in a queue.
+ *
+ */
+ class ProducerThread extends ReentrantLock implements Runnable {
+
+ private static final long serialVersionUID = 1L;
+
+ private Exception e = null;
+
+ private int waitTime = timeout;
+
+ private SortedKeyValueIterator<Key,Value> sourceIter = null;
+
+ public ProducerThread(SortedKeyValueIterator<Key,Value> source) {
+ this.sourceIter = source;
+ }
+
+ public void run() {
+ boolean hasMoreData = true;
+ // Keep this thread running while there is more data to read
+ // and items left in the queue to be read off.
+ while (hasMoreData || queue.size() > 0) {
+ try {
+ // Acquire the lock, this will wait if the lock is being
+ // held by the ReadAheadIterator.seek() method.
+ this.lock();
+ // Check to see if there is more data from the iterator below.
+ hasMoreData = sourceIter.hasTop();
+ // Break out of the loop if no more data.
+ if (!hasMoreData)
+ continue;
+ // Put the next K,V onto the queue.
+ try {
+ QueueElement e = new QueueElement(sourceIter.getTopKey(), sourceIter.getTopValue());
+ boolean inserted = false;
+ try {
+ inserted = queue.offer(e, this.waitTime, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ this.e = ie;
+ break;
+ }
+ if (!inserted) {
+ // Then we either got a timeout, set the error and break out of the loop
+ this.e = new TimeoutException("Background thread has exceeded wait time of " + this.waitTime + " seconds, aborting...");
+ break;
+ }
+ // Move the iterator to the next K,V for the next iteration of this loop
+ sourceIter.next();
+ } catch (Exception e) {
+ this.e = e;
+ log.error("Error calling next on source iterator", e);
+ break;
+ }
+ } finally {
+ this.unlock();
+ }
+ }
+ // If we broke out of the loop because of an error, then don't put the marker on the queue, just to do end.
+ if (!hasError()) {
+ // Put the special end of data marker into the queue
+ try {
+ queue.put(noMoreDataElement);
+ } catch (InterruptedException e) {
+ this.e = e;
+ log.error("Error putting End of Data marker onto queue");
+ }
+ }
+ }
+
+ public boolean hasError() {
+ return (this.e != null);
+ }
+
+ public Exception getError() {
+ return this.e;
+ }
+ }
+
+ private SortedKeyValueIterator<Key,Value> source;
+ private ArrayBlockingQueue<QueueElement> queue = null;
+ private QueueElement currentElement = new QueueElement();
+ private ProducerThread thread = null;
+ private Thread t = null;
+
+ protected ReadAheadIterator(ReadAheadIterator other, IteratorEnvironment env) {
+ source = other.source.deepCopy(env);
+ }
+
+ public ReadAheadIterator() {}
+
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new ReadAheadIterator(this, env);
+ }
+
+ public Key getTopKey() {
+ return currentElement.getKey();
+ }
+
+ public Value getTopValue() {
+ return currentElement.getValue();
+ }
+
+ public boolean hasTop() {
+ if (currentElement == noMoreDataElement)
+ return false;
+ return currentElement != null || queue.size() > 0 || source.hasTop();
+ }
+
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ validateOptions(options);
+ this.source = source;
+ queue = new ArrayBlockingQueue<QueueElement>(queueSize);
+ thread = new ProducerThread(this.source);
+ t = new Thread(thread, "ReadAheadIterator-SourceThread");
+ t.start();
+ }
+
+ /**
+ * Populate the key and value
+ */
+ public void next() throws IOException {
+ // Thread startup race condition, need to make sure that the
+ // thread has started before we call this the first time.
+ while (t.getState().equals(State.NEW)) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {}
+ }
+
+ if (t.getState().equals(State.TERMINATED)) {
+ // Thread encountered an error.
+ if (thread.hasError()) {
+ // and it should
+ throw new IOException("Background thread has died", thread.getError());
+ }
+ }
+
+ // Pull an element off the queue, this will wait if there is no data yet.
+ try {
+ if (thread.hasError())
+ throw new IOException("background thread has error", thread.getError());
+
+ QueueElement nextElement = null;
+ while (null == nextElement) {
+ try {
+ nextElement = queue.poll(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // TODO: Do we need to do anything here?
+ }
+ if (null == nextElement) {
+ // Then we have no data and timed out, check for error condition in the read ahead thread
+ if (thread.hasError()) {
+ throw new IOException("background thread has error", thread.getError());
+ }
+ }
+ }
+ currentElement = nextElement;
+ } catch (IOException e) {
+ throw new IOException("Error getting element from source iterator", e);
+ }
+ }
+
+ /**
+ * Seek to the next matching cell and call next to populate the key and value.
+ */
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ if (t.isAlive()) {
+ // Check for error
+ if (thread.hasError())
+ throw new IOException("background thread has error", thread.getError());
+
+ try {
+ // Acquire the lock, or wait until its unlocked by the producer thread.
+ thread.lock();
+ queue.clear();
+ currentElement = null;
+ source.seek(range, columnFamilies, inclusive);
+ } finally {
+ thread.unlock();
+ }
+ next();
+ } else {
+ throw new IOException("source iterator thread has died.");
+ }
+ }
+
+ public IteratorOptions describeOptions() {
+ Map<String,String> options = new HashMap<String,String>();
+ options.put(QUEUE_SIZE, "read ahead queue size");
+ options.put(TIMEOUT, "timeout in seconds before background thread thinks that the client has aborted");
+ return new IteratorOptions(getClass().getSimpleName(), "Iterator that puts the source in another thread", options, null);
+ }
+
+ public boolean validateOptions(Map<String,String> options) {
+ if (options.containsKey(QUEUE_SIZE))
+ queueSize = Integer.parseInt(options.get(QUEUE_SIZE));
+ if (options.containsKey(TIMEOUT))
+ timeout = Integer.parseInt(options.get(TIMEOUT));
+ return true;
+ }
+
+}
Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/ReadAheadIterator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/UniqFieldNameValueIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/UniqFieldNameValueIterator.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/UniqFieldNameValueIterator.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/UniqFieldNameValueIterator.java Fri Jan 6 22:02:09 2012
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.iterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.wikisearch.util.FieldIndexKeyParser;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+
+public class UniqFieldNameValueIterator extends WrappingIterator {
+
+ protected static final Logger log = Logger.getLogger(UniqFieldNameValueIterator.class);
+ // Wrapping iterator only accesses its private source in setSource and getSource
+ // Since this class overrides these methods, it's safest to keep the source declaration here
+ private SortedKeyValueIterator<Key,Value> source;
+ private FieldIndexKeyParser keyParser;
+ private Key topKey = null;
+ private Value topValue = null;
+ private Range overallRange = null;
+ private Range currentSubRange;
+ private Text fieldName = null;
+ private Text fieldValueLowerBound = null;
+ private Text fieldValueUpperBound = null;
+ private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
+ private static final String ONE_BYTE = "\1";
+ private boolean multiRow = false;
+ private boolean seekInclusive = false;
+
+ // -------------------------------------------------------------------------
+ // ------------- Static Methods
+ public static void setLogLevel(Level l) {
+ log.setLevel(l);
+ }
+
+ // -------------------------------------------------------------------------
+ // ------------- Constructors
+ public UniqFieldNameValueIterator(Text fName, Text fValLower, Text fValUpper) {
+ this.fieldName = fName;
+ this.fieldValueLowerBound = fValLower;
+ this.fieldValueUpperBound = fValUpper;
+ keyParser = createDefaultKeyParser();
+
+ }
+
+ public UniqFieldNameValueIterator(UniqFieldNameValueIterator other, IteratorEnvironment env) {
+ source = other.getSource().deepCopy(env);
+ // Set a default KeyParser
+ keyParser = createDefaultKeyParser();
+ }
+
+ // -------------------------------------------------------------------------
+ // ------------- Overrides
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ source = super.getSource();
+ }
+
+ @Override
+ protected void setSource(SortedKeyValueIterator<Key,Value> source) {
+ this.source = source;
+ }
+
+ @Override
+ protected SortedKeyValueIterator<Key,Value> getSource() {
+ return source;
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new UniqFieldNameValueIterator(this, env);
+ }
+
+ @Override
+ public Key getTopKey() {
+ return this.topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return this.topValue;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return (topKey != null);
+ }
+
+ @Override
+ public void next() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("next()");
+ }
+ if (!source.hasTop()) {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+
+ Key currentKey = topKey;
+ keyParser.parse(topKey);
+ String fValue = keyParser.getFieldValue();
+
+ Text currentRow = currentKey.getRow();
+ Text currentFam = currentKey.getColumnFamily();
+
+ if (overallRange.getEndKey() != null && overallRange.getEndKey().getRow().compareTo(currentRow) < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("next, overall endRow: " + overallRange.getEndKey().getRow() + " currentRow: " + currentRow);
+ }
+ topKey = null;
+ topValue = null;
+ return;
+ }
+
+ if (fValue.compareTo(this.fieldValueUpperBound.toString()) > 0) {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+ Key followingKey = new Key(currentKey.getRow(), this.fieldName, new Text(fValue + ONE_BYTE));
+ if (log.isDebugEnabled()) {
+ log.debug("next, followingKey to seek on: " + followingKey);
+ }
+ Range r = new Range(followingKey, followingKey);
+ source.seek(r, EMPTY_COL_FAMS, false);
+ while (true) {
+ if (!source.hasTop()) {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+
+ Key k = source.getTopKey();
+ if (!overallRange.contains(k)) {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("next(), key: " + k + " subrange: " + this.currentSubRange);
+ }
+ // if (this.currentSubRange.contains(k)) {
+ keyParser.parse(k);
+ Text currentVal = new Text(keyParser.getFieldValue());
+ if (k.getRow().equals(currentRow) && k.getColumnFamily().equals(currentFam) && currentVal.compareTo(fieldValueUpperBound) <= 0) {
+ topKey = k;
+ topValue = source.getTopValue();
+ return;
+
+ } else { // need to move to next row.
+ if (this.overallRange.contains(k) && this.multiRow) {
+ // need to find the next sub range
+ // STEPS
+ // 1. check if you moved past your current row on last call to next
+ // 2. figure out next row
+ // 3. build new start key with lowerbound fvalue
+ // 4. seek the source
+ // 5. test the subrange.
+ if (k.getRow().equals(currentRow)) {
+ // get next row
+ currentRow = getNextRow();
+ if (currentRow == null) {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+ } else {
+ // i'm already in the next row
+ currentRow = source.getTopKey().getRow();
+ }
+
+ // build new startKey
+ Key sKey = new Key(currentRow, fieldName, fieldValueLowerBound);
+ Key eKey = new Key(currentRow, fieldName, fieldValueUpperBound);
+ currentSubRange = new Range(sKey, eKey);
+ source.seek(currentSubRange, EMPTY_COL_FAMS, seekInclusive);
+
+ } else { // not multi-row or outside overall range, we're done
+ topKey = null;
+ topValue = null;
+ return;
+ }
+ }
+
+ }
+
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("seek, range: " + range);
+ }
+ this.overallRange = range;
+ this.seekInclusive = inclusive;
+ source.seek(range, EMPTY_COL_FAMS, inclusive);
+ topKey = null;
+ topValue = null;
+ Key sKey;
+ Key eKey;
+
+ if (range.isInfiniteStartKey()) {
+ sKey = source.getTopKey();
+ if (sKey == null) {
+ return;
+ }
+ } else {
+ sKey = range.getStartKey();
+ }
+
+ if (range.isInfiniteStopKey()) {
+ eKey = null;
+ this.multiRow = true; // assume we will go to the end of the tablet.
+ } else {
+ eKey = range.getEndKey();
+ if (sKey.getRow().equals(eKey.getRow())) {
+ this.multiRow = false;
+ } else {
+ this.multiRow = true;
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("seek, multiRow:" + multiRow + " range:" + range);
+ }
+
+ /*
+ * NOTE: If the seek range spans multiple rows, we are only interested in the fieldName:fieldValue subranges in each row. Keys will exist in the
+ * overallRange that we will want to skip over so we need to create subranges per row so we don't have to examine every key in between.
+ */
+
+ Text sRow = sKey.getRow();
+ Key ssKey = new Key(sRow, this.fieldName, this.fieldValueLowerBound);
+ Key eeKey = new Key(sRow, this.fieldName, this.fieldValueUpperBound);
+ this.currentSubRange = new Range(ssKey, eeKey);
+
+ if (log.isDebugEnabled()) {
+ log.debug("seek, currentSubRange: " + currentSubRange);
+ }
+ source.seek(this.currentSubRange, columnFamilies, inclusive);
+ // cycle until we find a valid topKey, or we get ejected b/c we hit the
+ // end of the tablet or exceeded the overallRange.
+ while (topKey == null) {
+ if (source.hasTop()) {
+ Key k = source.getTopKey();
+ if (log.isDebugEnabled()) {
+ log.debug("seek, source.topKey: " + k);
+ }
+ if (currentSubRange.contains(k)) {
+ topKey = k;
+ topValue = source.getTopValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug("seek, source has top in valid range");
+ }
+
+ } else { // outside of subRange.
+ // if multiRow mode, get the next row and seek to it
+ if (multiRow && overallRange.contains(k)) {
+
+ Key fKey = sKey.followingKey(PartialKey.ROW);
+ Range fRange = new Range(fKey, eKey);
+ source.seek(fRange, columnFamilies, inclusive);
+
+ if (source.hasTop()) {
+ Text row = source.getTopKey().getRow();
+ Key nKey = new Key(row, this.fieldName, this.fieldValueLowerBound);
+ this.currentSubRange = new Range(nKey, eKey);
+ sKey = this.currentSubRange.getStartKey();
+ Range nextRange = new Range(sKey, eKey);
+ source.seek(nextRange, columnFamilies, inclusive);
+ } else {
+ topKey = null;
+ topValue = null;
+ return;
+ }
+
+ } else { // not multi row & outside range, we're done.
+ topKey = null;
+ topValue = null;
+ return;
+ }
+ }
+ } else { // source does not have top, we're done
+ topKey = null;
+ topValue = null;
+ return;
+ }
+ }
+
+ }
+
+ // -------------------------------------------------------------------------
+ // ------------- Internal Methods
+ private FieldIndexKeyParser createDefaultKeyParser() {
+ FieldIndexKeyParser parser = new FieldIndexKeyParser();
+ return parser;
+ }
+
+ private Text getNextRow() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("getNextRow()");
+ }
+ Key fakeKey = new Key(source.getTopKey().followingKey(PartialKey.ROW));
+ Range fakeRange = new Range(fakeKey, fakeKey);
+ source.seek(fakeRange, EMPTY_COL_FAMS, false);
+ if (source.hasTop()) {
+ return source.getTopKey().getRow();
+ } else {
+ return null;
+ }
+ }
+}
Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/UniqFieldNameValueIterator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/jexl/Arithmetic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/jexl/Arithmetic.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/jexl/Arithmetic.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/jexl/Arithmetic.java Fri Jan 6 22:02:09 2012
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.jexl;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.jexl2.JexlArithmetic;
+import org.apache.commons.lang.math.NumberUtils;
+
+public class Arithmetic extends JexlArithmetic {
+
+ public Arithmetic(boolean lenient) {
+ super(lenient);
+ }
+
+ /**
+ * This method differs from the parent in that we are not calling String.matches() because it does not match on a newline. Instead we are handling this case.
+ *
+ * @param left
+ * first value
+ * @param right
+ * second value
+ * @return test result.
+ */
+ @Override
+ public boolean matches(Object left, Object right) {
+ if (left == null && right == null) {
+ // if both are null L == R
+ return true;
+ }
+ if (left == null || right == null) {
+ // we know both aren't null, therefore L != R
+ return false;
+ }
+ final String arg = left.toString();
+ if (right instanceof java.util.regex.Pattern) {
+ return ((java.util.regex.Pattern) right).matcher(arg).matches();
+ } else {
+ // return arg.matches(right.toString());
+ Pattern p = Pattern.compile(right.toString(), Pattern.DOTALL);
+ Matcher m = p.matcher(arg);
+ return m.matches();
+
+ }
+ }
+
+ /**
+ * This method differs from the parent class in that we are going to try and do a better job of coercing the types. As a last resort we will do a string
+ * comparison and try not to throw a NumberFormatException. The JexlArithmetic class performs coercion to a particular type if either the left or the right
+ * match a known type. We will look at the type of the right operator and try to make the left of the same type.
+ */
+ @Override
+ public boolean equals(Object left, Object right) {
+ Object fixedLeft = fixLeft(left, right);
+ return super.equals(fixedLeft, right);
+ }
+
+ @Override
+ public boolean lessThan(Object left, Object right) {
+ Object fixedLeft = fixLeft(left, right);
+ return super.lessThan(fixedLeft, right);
+ }
+
+ protected Object fixLeft(Object left, Object right) {
+
+ if (null == left || null == right)
+ return left;
+
+ if (!(right instanceof Number) && left instanceof Number) {
+ right = NumberUtils.createNumber(right.toString());
+ }
+
+ if (right instanceof Number && left instanceof Number) {
+ if (right instanceof Double)
+ return ((Double) right).doubleValue();
+ else if (right instanceof Float)
+ return ((Float) right).floatValue();
+ else if (right instanceof Long)
+ return ((Long) right).longValue();
+ else if (right instanceof Integer)
+ return ((Integer) right).intValue();
+ else if (right instanceof Short)
+ return ((Short) right).shortValue();
+ else if (right instanceof Byte)
+ return ((Byte) right).byteValue();
+ else
+ return right;
+ }
+ if (right instanceof Number && left instanceof String) {
+ Number num = NumberUtils.createNumber(left.toString());
+ // Let's try to cast left as right's type.
+ if (this.isFloatingPointNumber(right) && this.isFloatingPointNumber(left))
+ return num;
+ else if (this.isFloatingPointNumber(right))
+ return num.doubleValue();
+ else if (right instanceof Number)
+ return num.longValue();
+ } else if (right instanceof Boolean && left instanceof String) {
+ if (left.equals("true") || left.equals("false"))
+ return Boolean.parseBoolean(left.toString());
+
+ Number num = NumberUtils.createNumber(left.toString());
+ if (num.intValue() == 1)
+ return (Boolean) true;
+ else if (num.intValue() == 0)
+ return (Boolean) false;
+ }
+ return left;
+ }
+
+}
Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/jexl/Arithmetic.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/AbstractQueryLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/AbstractQueryLogic.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/AbstractQueryLogic.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/AbstractQueryLogic.java Fri Jan 6 22:02:09 2012
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.logic;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.wikisearch.ingest.WikipediaMapper;
+import org.apache.accumulo.wikisearch.iterator.BooleanLogicIterator;
+import org.apache.accumulo.wikisearch.iterator.EvaluatingIterator;
+import org.apache.accumulo.wikisearch.iterator.OptimizedQueryIterator;
+import org.apache.accumulo.wikisearch.iterator.ReadAheadIterator;
+import org.apache.accumulo.wikisearch.normalizer.LcNoDiacriticsNormalizer;
+import org.apache.accumulo.wikisearch.normalizer.Normalizer;
+import org.apache.accumulo.wikisearch.parser.EventFields;
+import org.apache.accumulo.wikisearch.parser.EventFields.FieldValue;
+import org.apache.accumulo.wikisearch.parser.FieldIndexQueryReWriter;
+import org.apache.accumulo.wikisearch.parser.JexlOperatorConstants;
+import org.apache.accumulo.wikisearch.parser.QueryParser;
+import org.apache.accumulo.wikisearch.parser.QueryParser.QueryTerm;
+import org.apache.accumulo.wikisearch.parser.RangeCalculator;
+import org.apache.accumulo.wikisearch.sample.Document;
+import org.apache.accumulo.wikisearch.sample.Field;
+import org.apache.accumulo.wikisearch.sample.Results;
+import org.apache.commons.jexl2.parser.ParserTreeConstants;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * <pre>
+ * <h2>Overview</h2>
+ * Query implementation that works with the JEXL grammar. This
+ * uses the metadata, global index, and partitioned table to return
+ * results based on the query. Example queries:
+ *
+ * <b>Single Term Query</b>
+ * 'foo' - looks in global index for foo, and if any entries are found, then the query
+ * is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed
+ * down the optimized query path which uses the intersecting iterators on the partitioned
+ * table.
+ *
+ * <b>Boolean expression</b>
+ * field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number),
+ * the query is parsed and the set of eventFields in the query that are indexed is determined by
+ * querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the
+ * eventFields that are indexed, the query may be sent down the optimized path or the full scan path.
+ *
+ * We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators:
+ *
+ * ==, !=, >, ≥, <, ≤, =~, and !~
+ *
+ * Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction
+ * with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An
+ * example using this function is : "f:between(LATITUDE,60.0, 70.0)"
+ *
+ * <h2>Constraints on Query Structure</h2>
+ * Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are
+ * rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. If
+ * an error occurs in the evaluation we are skipping the event.
+ *
+ * <h2>Notes on Optimization</h2>
+ * Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table:
+ *
+ * 1. An 'or' conjunction exists in the query but not all of the terms are indexed.
+ * 2. No indexed terms exist in the query
+ * 3. An unsupported operator exists in the query
+ *
+ * </pre>
+ *
+ */
+public abstract class AbstractQueryLogic {
+
+ protected static Logger log = Logger.getLogger(AbstractQueryLogic.class);
+
+ /**
+ * Set of datatypes to limit the query to.
+ */
+ public static final String DATATYPE_FILTER_SET = "datatype.filter.set";
+
+ private static class DoNotPerformOptimizedQueryException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * Object that is used to hold ranges found in the index. Subclasses may compute the final range set in various ways.
+ */
+ public static abstract class IndexRanges {
+
+ private Map<String,String> indexValuesToOriginalValues = null;
+ private Multimap<String,String> fieldNamesAndValues = HashMultimap.create();
+ private Map<String,Long> termCardinality = new HashMap<String,Long>();
+ protected Map<String,TreeSet<Range>> ranges = new HashMap<String,TreeSet<Range>>();
+
+ public Multimap<String,String> getFieldNamesAndValues() {
+ return fieldNamesAndValues;
+ }
+
+ public void setFieldNamesAndValues(Multimap<String,String> fieldNamesAndValues) {
+ this.fieldNamesAndValues = fieldNamesAndValues;
+ }
+
+ public final Map<String,Long> getTermCardinality() {
+ return termCardinality;
+ }
+
+ public Map<String,String> getIndexValuesToOriginalValues() {
+ return indexValuesToOriginalValues;
+ }
+
+ public void setIndexValuesToOriginalValues(Map<String,String> indexValuesToOriginalValues) {
+ this.indexValuesToOriginalValues = indexValuesToOriginalValues;
+ }
+
+ public abstract void add(String term, Range r);
+
+ public abstract Set<Range> getRanges();
+ }
+
+ /**
+ * Object that computes the ranges by unioning all of the ranges for all of the terms together. In the case where ranges overlap, the largest range is used.
+ */
+ public static class UnionIndexRanges extends IndexRanges {
+
+ public static String DEFAULT_KEY = "default";
+
+ public UnionIndexRanges() {
+ this.ranges.put(DEFAULT_KEY, new TreeSet<Range>());
+ }
+
+ public Set<Range> getRanges() {
+ // So the set of ranges is ordered. It *should* be the case that
+ // ranges with partition ids will sort before ranges that point to
+ // a specific event. Populate a new set of ranges but don't add a
+ // range for an event where that range is contained in a range already
+ // added.
+ Set<Text> shardsAdded = new HashSet<Text>();
+ Set<Range> returnSet = new HashSet<Range>();
+ for (Range r : ranges.get(DEFAULT_KEY)) {
+ if (!shardsAdded.contains(r.getStartKey().getRow())) {
+ // Only add ranges with a start key for the entire partition.
+ if (r.getStartKey().getColumnFamily() == null) {
+ shardsAdded.add(r.getStartKey().getRow());
+ }
+ returnSet.add(r);
+ } else {
+ // if (log.isTraceEnabled())
+ log.info("Skipping event specific range: " + r.toString() + " because range has already been added: "
+ + shardsAdded.contains(r.getStartKey().getRow()));
+ }
+ }
+ return returnSet;
+ }
+
+ public void add(String term, Range r) {
+ ranges.get(DEFAULT_KEY).add(r);
+ }
+ }
+
+ private String metadataTableName;
+ private String indexTableName;
+ private String reverseIndexTableName;
+ private String tableName;
+ private int queryThreads = 8;
+ private String readAheadQueueSize;
+ private String readAheadTimeOut;
+ private boolean useReadAheadIterator;
+ private Kryo kryo = new Kryo();
+ private EventFields eventFields = new EventFields();
+ private List<String> unevaluatedFields = null;
+ private int numPartitions = 0;
+ private Map<Class<? extends Normalizer>,Normalizer> normalizerCacheMap = new HashMap<Class<? extends Normalizer>,Normalizer>();
+ private static final String NULL_BYTE = "\u0000";
+
+ public AbstractQueryLogic() {
+ super();
+ EventFields.initializeKryo(kryo);
+ }
+
+ /**
+ * Queries metadata table to determine which terms are indexed.
+ *
+ * @param c
+ * @param auths
+ * @param queryLiterals
+ * @param begin
+ * @param end
+ * @param datatypes
+ * - optional list of types
+ * @return map of indexed field names to types to normalizers used in this date range
+ * @throws TableNotFoundException
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ */
+ protected Map<String,Multimap<String,Class<? extends Normalizer>>> findIndexedTerms(Connector c, Authorizations auths, Set<String> queryLiterals,
+ Set<String> datatypes) throws TableNotFoundException, InstantiationException, IllegalAccessException {
+
+ Map<String,Multimap<String,Class<? extends Normalizer>>> results = new HashMap<String,Multimap<String,Class<? extends Normalizer>>>();
+
+ for (String literal : queryLiterals) {
+ if (log.isDebugEnabled())
+ log.debug("Querying " + this.getMetadataTableName() + " table for " + literal);
+ Range range = new Range(literal.toUpperCase());
+ Scanner scanner = c.createScanner(this.getMetadataTableName(), auths);
+ scanner.setRange(range);
+ scanner.fetchColumnFamily(new Text(WikipediaMapper.METADATA_INDEX_COLUMN_FAMILY));
+ for (Entry<Key,Value> entry : scanner) {
+ if (!results.containsKey(literal)) {
+ Multimap<String,Class<? extends Normalizer>> m = HashMultimap.create();
+ results.put(literal, m);
+ }
+ // Get the column qualifier from the key. It contains the datatype and normalizer class
+ String colq = entry.getKey().getColumnQualifier().toString();
+ if (null != colq && colq.contains("\0")) {
+ int idx = colq.indexOf("\0");
+ if (idx != -1) {
+ String type = colq.substring(0, idx);
+ // If types are specified and this type is not in the list then skip it.
+ if (null != datatypes && !datatypes.contains(type))
+ continue;
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends Normalizer> clazz = (Class<? extends Normalizer>) Class.forName(colq.substring(idx + 1));
+ if (!normalizerCacheMap.containsKey(clazz))
+ normalizerCacheMap.put(clazz, clazz.newInstance());
+ results.get(literal).put(type, clazz);
+ } catch (ClassNotFoundException e) {
+ log.error("Unable to find normalizer on class path: " + colq.substring(idx + 1), e);
+ results.get(literal).put(type, LcNoDiacriticsNormalizer.class);
+ }
+ } else {
+ log.warn("EventMetadata entry did not contain NULL byte: " + entry.getKey().toString());
+ }
+ } else {
+ log.warn("ColumnQualifier null in EventMetadata for key: " + entry.getKey().toString());
+ }
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("METADATA RESULTS: " + results.toString());
+ return results;
+ }
+
+ /**
+ * Performs a lookup in the global index for a single non-fielded term.
+ *
+ * @param c
+ * @param auths
+ * @param value
+ * @param begin
+ * @param end
+ * @param datatypes
+ * - optional list of types
+ * @return ranges that fit into the date range.
+ */
+ protected abstract IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> datatypes) throws TableNotFoundException;
+
+ /**
+ * Performs a lookup in the global index / reverse index and returns a RangeCalculator
+ *
+ * @param c
+ * Accumulo connection
+ * @param auths
+ * authset for queries
+ * @param indexedTerms
+ * multimap of indexed field name and Normalizers used
+ * @param terms
+ * multimap of field name and QueryTerm object
+ * @param begin
+ * query begin date
+ * @param end
+ * query end date
+ * @param dateFormatter
+ * @param indexTableName
+ * @param reverseIndexTableName
+ * @param queryString
+ * original query string
+ * @param queryThreads
+ * @param datatypes
+ * - optional list of types
+ * @return range calculator
+ * @throws TableNotFoundException
+ */
+ protected abstract RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String,Normalizer> indexedTerms,
+ Multimap<String,QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> datatypes)
+ throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException;
+
+ protected abstract Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String,QueryTerm> terms);
+
+ public String getMetadataTableName() {
+ return metadataTableName;
+ }
+
+ public String getIndexTableName() {
+ return indexTableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setMetadataTableName(String metadataTableName) {
+ this.metadataTableName = metadataTableName;
+ }
+
+ public void setIndexTableName(String indexTableName) {
+ this.indexTableName = indexTableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public int getQueryThreads() {
+ return queryThreads;
+ }
+
+ public void setQueryThreads(int queryThreads) {
+ this.queryThreads = queryThreads;
+ }
+
+ public String getReadAheadQueueSize() {
+ return readAheadQueueSize;
+ }
+
+ public String getReadAheadTimeOut() {
+ return readAheadTimeOut;
+ }
+
+ public boolean isUseReadAheadIterator() {
+ return useReadAheadIterator;
+ }
+
+ public void setReadAheadQueueSize(String readAheadQueueSize) {
+ this.readAheadQueueSize = readAheadQueueSize;
+ }
+
+ public void setReadAheadTimeOut(String readAheadTimeOut) {
+ this.readAheadTimeOut = readAheadTimeOut;
+ }
+
+ public void setUseReadAheadIterator(boolean useReadAheadIterator) {
+ this.useReadAheadIterator = useReadAheadIterator;
+ }
+
+ public String getReverseIndexTableName() {
+ return reverseIndexTableName;
+ }
+
+ public void setReverseIndexTableName(String reverseIndexTableName) {
+ this.reverseIndexTableName = reverseIndexTableName;
+ }
+
+ public List<String> getUnevaluatedFields() {
+ return unevaluatedFields;
+ }
+
+ public void setUnevaluatedFields(List<String> unevaluatedFields) {
+ this.unevaluatedFields = unevaluatedFields;
+ }
+
+ public void setUnevaluatedFields(String unevaluatedFieldList) {
+ this.unevaluatedFields = new ArrayList<String>();
+ for (String field : unevaluatedFieldList.split(","))
+ this.unevaluatedFields.add(field);
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ public void setNumPartitions(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ public Document createDocument(Key key, Value value) {
+ eventFields.clear();
+ ByteBuffer buf = ByteBuffer.wrap(value.get());
+ eventFields.readObjectData(kryo, buf);
+
+ Document doc = new Document();
+ // Set the id to the document id which is located in the colf
+ String row = key.getRow().toString();
+ String colf = key.getColumnFamily().toString();
+ int idx = colf.indexOf(NULL_BYTE);
+ String type = colf.substring(0, idx);
+ String id = colf.substring(idx + 1);
+ doc.setId(id);
+ for (Entry<String,Collection<FieldValue>> entry : eventFields.asMap().entrySet()) {
+ for (FieldValue fv : entry.getValue()) {
+ Field val = new Field();
+ val.setFieldName(entry.getKey());
+ val.setFieldValue(new String(fv.getValue(), Charset.forName("UTF-8")));
+ doc.getFields().add(val);
+ }
+ }
+
+ // Add the pointer for the content.
+ Field docPointer = new Field();
+ docPointer.setFieldName("DOCUMENT");
+ docPointer.setFieldValue("DOCUMENT:" + row + "/" + type + "/" + id);
+ doc.getFields().add(docPointer);
+
+ return doc;
+ }
+
+ public String getResultsKey(Entry<Key,Value> key) {
+ // Use the colf from the table, it contains the uuid and datatype
+ return key.getKey().getColumnFamily().toString();
+ }
+
+ public Results runQuery(Connector connector, List<String> authorizations, String query, Date beginDate, Date endDate, Set<String> types) {
+
+ if (StringUtils.isEmpty(query)) {
+ throw new IllegalArgumentException("NULL QueryNode reference passed to " + this.getClass().getSimpleName());
+ }
+
+ Set<Range> ranges = new HashSet<Range>();
+ Set<String> typeFilter = types;
+ String array[] = authorizations.toArray(new String[0]);
+ Authorizations auths = new Authorizations(array);
+ Results results = new Results();
+
+ // Get the query string
+ String queryString = query;
+
+ StopWatch abstractQueryLogic = new StopWatch();
+ StopWatch optimizedQuery = new StopWatch();
+ StopWatch queryGlobalIndex = new StopWatch();
+ StopWatch optimizedEventQuery = new StopWatch();
+ StopWatch fullScanQuery = new StopWatch();
+ StopWatch processResults = new StopWatch();
+
+ abstractQueryLogic.start();
+
+ StopWatch parseQuery = new StopWatch();
+ parseQuery.start();
+
+ QueryParser parser;
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("ShardQueryLogic calling QueryParser.execute");
+ }
+ parser = new QueryParser();
+ parser.execute(queryString);
+ } catch (org.apache.commons.jexl2.parser.ParseException e1) {
+ throw new IllegalArgumentException("Error parsing query", e1);
+ }
+ int hash = parser.getHashValue();
+ parseQuery.stop();
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " Query: " + queryString);
+ }
+
+ Set<String> fields = new HashSet<String>();
+ for (String f : parser.getQueryIdentifiers()) {
+ fields.add(f);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("getQueryIdentifiers: " + parser.getQueryIdentifiers().toString());
+ }
+ // Remove any negated fields from the fields list, we don't want to lookup negated fields
+ // in the index.
+ fields.removeAll(parser.getNegatedTermsForOptimizer());
+
+ if (log.isDebugEnabled()) {
+ log.debug("getQueryIdentifiers: " + parser.getQueryIdentifiers().toString());
+ }
+ // Get the mapping of field name to QueryTerm object from the query. The query term object
+ // contains the operator, whether its negated or not, and the literal to test against.
+ Multimap<String,QueryTerm> terms = parser.getQueryTerms();
+
+ // Find out which terms are indexed
+ // TODO: Should we cache indexed terms or does that not make sense since we are always
+ // loading data.
+ StopWatch queryMetadata = new StopWatch();
+ queryMetadata.start();
+ Map<String,Multimap<String,Class<? extends Normalizer>>> metadataResults;
+ try {
+ metadataResults = findIndexedTerms(connector, auths, fields, typeFilter);
+ } catch (Exception e1) {
+ throw new RuntimeException("Error in metadata lookup", e1);
+ }
+
+ // Create a map of indexed term to set of normalizers for it
+ Multimap<String,Normalizer> indexedTerms = HashMultimap.create();
+ for (Entry<String,Multimap<String,Class<? extends Normalizer>>> entry : metadataResults.entrySet()) {
+ // Get the normalizer from the normalizer cache
+ for (Class<? extends Normalizer> clazz : entry.getValue().values()) {
+ indexedTerms.put(entry.getKey(), normalizerCacheMap.get(clazz));
+ }
+ }
+ queryMetadata.stop();
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " Indexed Terms: " + indexedTerms.toString());
+ }
+
+ Set<String> orTerms = parser.getOrTermsForOptimizer();
+
+ // Iterate over the query terms to get the operators specified in the query.
+ ArrayList<String> unevaluatedExpressions = new ArrayList<String>();
+ boolean unsupportedOperatorSpecified = false;
+ for (Entry<String,QueryTerm> entry : terms.entries()) {
+ if (null == entry.getValue()) {
+ continue;
+ }
+
+ if (null != this.unevaluatedFields && this.unevaluatedFields.contains(entry.getKey().trim())) {
+ unevaluatedExpressions.add(entry.getKey().trim() + " " + entry.getValue().getOperator() + " " + entry.getValue().getValue());
+ }
+
+ int operator = JexlOperatorConstants.getJJTNodeType(entry.getValue().getOperator());
+ if (!(operator == ParserTreeConstants.JJTEQNODE || operator == ParserTreeConstants.JJTNENODE || operator == ParserTreeConstants.JJTLENODE
+ || operator == ParserTreeConstants.JJTLTNODE || operator == ParserTreeConstants.JJTGENODE || operator == ParserTreeConstants.JJTGTNODE || operator == ParserTreeConstants.JJTERNODE)) {
+ unsupportedOperatorSpecified = true;
+ break;
+ }
+ }
+ if (null != unevaluatedExpressions)
+ unevaluatedExpressions.trimToSize();
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " unsupportedOperators: " + unsupportedOperatorSpecified + " indexedTerms: " + indexedTerms.toString() + " orTerms: "
+ + orTerms.toString() + " unevaluatedExpressions: " + unevaluatedExpressions.toString());
+ }
+
+ // We can use the intersecting iterator over the field index as an optimization under the
+ // following conditions
+ //
+ // 1. No unsupported operators in the query.
+ // 2. No 'or' operators and at least one term indexed
+ // or
+ // 1. No unsupported operators in the query.
+ // 2. and all terms indexed
+ // or
+ // 1. All or'd terms are indexed. NOTE, this will potentially skip some queries and push to a full table scan
+ // // WE should look into finding a better way to handle whether we do an optimized query or not.
+ boolean optimizationSucceeded = false;
+ boolean orsAllIndexed = false;
+ if (orTerms.isEmpty()) {
+ orsAllIndexed = false;
+ } else {
+ orsAllIndexed = indexedTerms.keySet().containsAll(orTerms);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("All or terms are indexed");
+ }
+
+ if (!unsupportedOperatorSpecified
+ && (((null == orTerms || orTerms.isEmpty()) && indexedTerms.size() > 0) || (fields.size() > 0 && indexedTerms.size() == fields.size()) || orsAllIndexed)) {
+ optimizedQuery.start();
+ // Set up intersecting iterator over field index.
+
+ // Get information from the global index for the indexed terms. The results object will contain the term
+ // mapped to an object that contains the total count, and partitions where this term is located.
+
+ // TODO: Should we cache indexed term information or does that not make sense since we are always loading data
+ queryGlobalIndex.start();
+ IndexRanges termIndexInfo;
+ try {
+ // If fields is null or zero, then it's probably the case that the user entered a value
+ // to search for with no fields. Check for the value in index.
+ if (fields.isEmpty()) {
+ termIndexInfo = this.getTermIndexInformation(connector, auths, queryString, typeFilter);
+ if (null != termIndexInfo && termIndexInfo.getRanges().isEmpty()) {
+ // Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards
+ // in unhandled locations.
+ // Break out of here by throwing a named exception and do full scan
+ throw new DoNotPerformOptimizedQueryException();
+ }
+ // We need to rewrite the query string here so that it's valid.
+ if (termIndexInfo instanceof UnionIndexRanges) {
+ UnionIndexRanges union = (UnionIndexRanges) termIndexInfo;
+ StringBuilder buf = new StringBuilder();
+ String sep = "";
+ for (String fieldName : union.getFieldNamesAndValues().keySet()) {
+ buf.append(sep).append(fieldName).append(" == ");
+ if (!(queryString.startsWith("'") && queryString.endsWith("'"))) {
+ buf.append("'").append(queryString).append("'");
+ } else {
+ buf.append(queryString);
+ }
+ sep = " or ";
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Rewrote query for non-fielded single term query: " + queryString + " to " + buf.toString());
+ }
+ queryString = buf.toString();
+ } else {
+ throw new RuntimeException("Unexpected IndexRanges implementation");
+ }
+ } else {
+ RangeCalculator calc = this.getTermIndexInformation(connector, auths, indexedTerms, terms, this.getIndexTableName(), this.getReverseIndexTableName(),
+ queryString, this.queryThreads, typeFilter);
+ if (null == calc.getResult() || calc.getResult().isEmpty()) {
+ // Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards
+ // in unhandled locations.
+ // Break out of here by throwing a named exception and do full scan
+ throw new DoNotPerformOptimizedQueryException();
+ }
+ termIndexInfo = new UnionIndexRanges();
+ termIndexInfo.setIndexValuesToOriginalValues(calc.getIndexValues());
+ termIndexInfo.setFieldNamesAndValues(calc.getIndexEntries());
+ termIndexInfo.getTermCardinality().putAll(calc.getTermCardinalities());
+ for (Range r : calc.getResult()) {
+ // foo is a placeholder and is ignored.
+ termIndexInfo.add("foo", r);
+ }
+ }
+ } catch (TableNotFoundException e) {
+ log.error(this.getIndexTableName() + "not found", e);
+ throw new RuntimeException(this.getIndexTableName() + "not found", e);
+ } catch (org.apache.commons.jexl2.parser.ParseException e) {
+ throw new RuntimeException("Error determining ranges for query: " + queryString, e);
+ } catch (DoNotPerformOptimizedQueryException e) {
+ log.info("Indexed fields not found in index, performing full scan");
+ termIndexInfo = null;
+ }
+ queryGlobalIndex.stop();
+
+ // Determine if we should proceed with optimized query based on results from the global index
+ boolean proceed = false;
+ if (null == termIndexInfo || termIndexInfo.getFieldNamesAndValues().values().size() == 0) {
+ proceed = false;
+ } else if (null != orTerms && orTerms.size() > 0 && (termIndexInfo.getFieldNamesAndValues().values().size() == indexedTerms.size())) {
+ proceed = true;
+ } else if (termIndexInfo.getFieldNamesAndValues().values().size() > 0) {
+ proceed = true;
+ } else if (orsAllIndexed) {
+ proceed = true;
+ } else {
+ proceed = false;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Proceed with optimized query: " + proceed);
+ if (null != termIndexInfo)
+ log.debug("termIndexInfo.getTermsFound().size(): " + termIndexInfo.getFieldNamesAndValues().values().size() + " indexedTerms.size: "
+ + indexedTerms.size() + " fields.size: " + fields.size());
+ }
+ if (proceed) {
+
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " Performing optimized query");
+ }
+ // Use the scan ranges from the GlobalIndexRanges object as the ranges for the batch scanner
+ ranges = termIndexInfo.getRanges();
+ if (log.isDebugEnabled()) {
+ log.info(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString());
+ }
+
+ // Create BatchScanner, set the ranges, and setup the iterators.
+ optimizedEventQuery.start();
+ BatchScanner bs = null;
+ try {
+ bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads);
+ bs.setRanges(ranges);
+ IteratorSetting si = new IteratorSetting(21, "eval", OptimizedQueryIterator.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString);
+ }
+ // Set the query option
+ si.addOption(EvaluatingIterator.QUERY_OPTION, queryString);
+ // Set the Indexed Terms List option. This is the field name and normalized field value pair separated
+ // by a comma.
+ StringBuilder buf = new StringBuilder();
+ String sep = "";
+ for (Entry<String,String> entry : termIndexInfo.getFieldNamesAndValues().entries()) {
+ buf.append(sep);
+ buf.append(entry.getKey());
+ buf.append(":");
+ buf.append(termIndexInfo.getIndexValuesToOriginalValues().get(entry.getValue()));
+ buf.append(":");
+ buf.append(entry.getValue());
+ if (sep.equals("")) {
+ sep = ";";
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Setting scan option: " + FieldIndexQueryReWriter.INDEXED_TERMS_LIST + " to " + buf.toString());
+ }
+ FieldIndexQueryReWriter rewriter = new FieldIndexQueryReWriter();
+ String q = "";
+ try {
+ q = queryString;
+ q = rewriter.applyCaseSensitivity(q, true, false);// Set upper/lower case for fieldname/fieldvalue
+ Map<String,String> opts = new HashMap<String,String>();
+ opts.put(FieldIndexQueryReWriter.INDEXED_TERMS_LIST, buf.toString());
+ q = rewriter.removeNonIndexedTermsAndInvalidRanges(q, opts);
+ q = rewriter.applyNormalizedTerms(q, opts);
+ if (log.isDebugEnabled()) {
+ log.debug("runServerQuery, FieldIndex Query: " + q);
+ }
+ } catch (org.apache.commons.jexl2.parser.ParseException ex) {
+ log.error("Could not parse query, Jexl ParseException: " + ex);
+ } catch (Exception ex) {
+ log.error("Problem rewriting query, Exception: " + ex.getMessage());
+ }
+ si.addOption(BooleanLogicIterator.FIELD_INDEX_QUERY, q);
+
+ // Set the term cardinality option
+ sep = "";
+ buf.delete(0, buf.length());
+ for (Entry<String,Long> entry : termIndexInfo.getTermCardinality().entrySet()) {
+ buf.append(sep);
+ buf.append(entry.getKey());
+ buf.append(":");
+ buf.append(entry.getValue());
+ sep = ",";
+ }
+ if (log.isDebugEnabled())
+ log.debug("Setting scan option: " + BooleanLogicIterator.TERM_CARDINALITIES + " to " + buf.toString());
+ si.addOption(BooleanLogicIterator.TERM_CARDINALITIES, buf.toString());
+ if (this.useReadAheadIterator) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enabling read ahead iterator with queue size: " + this.readAheadQueueSize + " and timeout: " + this.readAheadTimeOut);
+ }
+ si.addOption(ReadAheadIterator.QUEUE_SIZE, this.readAheadQueueSize);
+ si.addOption(ReadAheadIterator.TIMEOUT, this.readAheadTimeOut);
+
+ }
+
+ if (null != unevaluatedExpressions) {
+ StringBuilder unevaluatedExpressionList = new StringBuilder();
+ String sep2 = "";
+ for (String exp : unevaluatedExpressions) {
+ unevaluatedExpressionList.append(sep2).append(exp);
+ sep2 = ",";
+ }
+ if (log.isDebugEnabled())
+ log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString());
+ si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString());
+ }
+
+ bs.addScanIterator(si);
+
+ processResults.start();
+ processResults.suspend();
+ long count = 0;
+ for (Entry<Key,Value> entry : bs) {
+ count++;
+ // The key that is returned by the EvaluatingIterator is not the same key that is in
+ // the table. The value that is returned by the EvaluatingIterator is a kryo
+ // serialized EventFields object.
+ processResults.resume();
+ Document d = this.createDocument(entry.getKey(), entry.getValue());
+ results.getResults().add(d);
+ processResults.suspend();
+ }
+ log.info(count + " matching entries found in optimized query.");
+ optimizationSucceeded = true;
+ processResults.stop();
+ } catch (TableNotFoundException e) {
+ log.error(this.getTableName() + "not found", e);
+ throw new RuntimeException(this.getIndexTableName() + "not found", e);
+ } finally {
+ if (bs != null) {
+ bs.close();
+ }
+ }
+ optimizedEventQuery.stop();
+ }
+ optimizedQuery.stop();
+ }
+
+ // WE should look into finding a better way to handle whether we do an optimized query or not.
+ // We are not setting up an else condition here because we may have aborted the logic early in the if statement.
+ if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()) && !orsAllIndexed)) {
+ // if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()))) {
+ fullScanQuery.start();
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " Performing full scan query");
+ }
+
+ // Set up a full scan using the date ranges from the query
+ // Create BatchScanner, set the ranges, and setup the iterators.
+ BatchScanner bs = null;
+ try {
+ // The ranges are the start and end dates
+ Collection<Range> r = getFullScanRange(beginDate, endDate, terms);
+ ranges.addAll(r);
+
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString());
+ }
+
+ bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads);
+ bs.setRanges(ranges);
+ IteratorSetting si = new IteratorSetting(22, "eval", EvaluatingIterator.class);
+ // Create datatype regex if needed
+ if (null != typeFilter) {
+ StringBuilder buf = new StringBuilder();
+ String s = "";
+ for (String type : typeFilter) {
+ buf.append(s).append(type).append(".*");
+ s = "|";
+ }
+ if (log.isDebugEnabled())
+ log.debug("Setting colf regex iterator to: " + buf.toString());
+ IteratorSetting ri = new IteratorSetting(21, "typeFilter", RegExFilter.class);
+ RegExFilter.setRegexs(ri, null, buf.toString(), null, null, false);
+ bs.addScanIterator(ri);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString);
+ }
+ si.addOption(EvaluatingIterator.QUERY_OPTION, queryString);
+ if (null != unevaluatedExpressions) {
+ StringBuilder unevaluatedExpressionList = new StringBuilder();
+ String sep2 = "";
+ for (String exp : unevaluatedExpressions) {
+ unevaluatedExpressionList.append(sep2).append(exp);
+ sep2 = ",";
+ }
+ if (log.isDebugEnabled())
+ log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString());
+ si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString());
+ }
+ bs.addScanIterator(si);
+ long count = 0;
+ processResults.start();
+ processResults.suspend();
+ for (Entry<Key,Value> entry : bs) {
+ count++;
+ // The key that is returned by the EvaluatingIterator is not the same key that is in
+ // the partition table. The value that is returned by the EvaluatingIterator is a kryo
+ // serialized EventFields object.
+ processResults.resume();
+ Document d = this.createDocument(entry.getKey(), entry.getValue());
+ results.getResults().add(d);
+ processResults.suspend();
+ }
+ processResults.stop();
+ log.info(count + " matching entries found in full scan query.");
+ } catch (TableNotFoundException e) {
+ log.error(this.getTableName() + "not found", e);
+ } finally {
+ if (bs != null) {
+ bs.close();
+ }
+ }
+ fullScanQuery.stop();
+ }
+
+ log.info("AbstractQueryLogic: " + queryString + " " + timeString(abstractQueryLogic.getTime()));
+ log.info(" 1) parse query " + timeString(parseQuery.getTime()));
+ log.info(" 2) query metadata " + timeString(queryMetadata.getTime()));
+ log.info(" 3) full scan query " + timeString(fullScanQuery.getTime()));
+ log.info(" 3) optimized query " + timeString(optimizedQuery.getTime()));
+ log.info(" 1) process results " + timeString(processResults.getTime()));
+ log.info(" 1) query global index " + timeString(queryGlobalIndex.getTime()));
+ log.info(hash + " Query completed.");
+
+ return results;
+ }
+
+ private static String timeString(long millis) {
+ return String.format("%4.2f", millis / 1000.);
+ }
+
+}
Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/AbstractQueryLogic.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/ContentLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/ContentLogic.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/ContentLogic.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/ContentLogic.java Fri Jan 6 22:02:09 2012
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.logic;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.wikisearch.ingest.WikipediaMapper;
+import org.apache.accumulo.wikisearch.sample.Document;
+import org.apache.accumulo.wikisearch.sample.Field;
+import org.apache.accumulo.wikisearch.sample.Results;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This query table implementation returns a Results object that contains documents from the wiki table. The query will contain the partition id, wikitype, and
+ * UID so that we can seek directly to the document. The document is stored as base64 compressed binary in the Accumulo table. We will decompress the data so
+ * that it is base64 encoded binary data in the Results object.
+ *
+ * The query that needs to be passed to the web service is: DOCUMENT:partitionId/wikitype/uid.
+ *
+ */
+public class ContentLogic {
+
+ private static final Logger log = Logger.getLogger(ContentLogic.class);
+
+ private static final String NULL_BYTE = "\u0000";
+
+ private String tableName = null;
+
+ private Pattern queryPattern = Pattern.compile("^DOCUMENT:(.*)/(.*)/(.*)$");
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public Results runQuery(Connector connector, String query, List<String> authorizations) {
+
+ Results results = new Results();
+ Authorizations auths = new Authorizations(StringUtils.join(authorizations, "|"));
+
+ Matcher match = queryPattern.matcher(query);
+ if (!match.matches()) {
+ throw new IllegalArgumentException("Query does not match the pattern: DOCUMENT:partitionId/wikitype/uid, your query: " + query.toString());
+ } else {
+ String partitionId = match.group(1);
+ String wikitype = match.group(2);
+ String id = match.group(3);
+
+ log.debug("Received pieces: " + partitionId + ", " + wikitype + ", " + id);
+
+ // Create the Range
+ Key startKey = new Key(partitionId, WikipediaMapper.DOCUMENT_COLUMN_FAMILY, wikitype + NULL_BYTE + id);
+ Key endKey = new Key(partitionId, WikipediaMapper.DOCUMENT_COLUMN_FAMILY, wikitype + NULL_BYTE + id + NULL_BYTE);
+ Range r = new Range(startKey, true, endKey, false);
+
+ log.debug("Setting range: " + r);
+
+ try {
+ Scanner scanner = connector.createScanner(this.getTableName(), auths);
+ scanner.setRange(r);
+ // This should in theory only match one thing.
+ for (Entry<Key,Value> entry : scanner) {
+ Document doc = new Document();
+ doc.setId(id);
+ Field val = new Field();
+ val.setFieldName("DOCUMENT");
+ val.setFieldValue(new String(Base64.decodeBase64(entry.getValue().toString())));
+ doc.getFields().add(val);
+ results.getResults().add(doc);
+ }
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException("Table not found: " + this.getTableName(), e);
+ }
+
+ }
+ return results;
+ }
+
+}
Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/ContentLogic.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/QueryLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/QueryLogic.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/QueryLogic.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/QueryLogic.java Fri Jan 6 22:02:09 2012
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.logic;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.wikisearch.iterator.EvaluatingIterator;
+import org.apache.accumulo.wikisearch.normalizer.LcNoDiacriticsNormalizer;
+import org.apache.accumulo.wikisearch.normalizer.Normalizer;
+import org.apache.accumulo.wikisearch.parser.QueryParser.QueryTerm;
+import org.apache.accumulo.wikisearch.parser.RangeCalculator;
+import org.apache.accumulo.wikisearch.protobuf.Uid;
+import org.apache.accumulo.wikisearch.util.TextUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Multimap;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * <pre>
+ * <h2>Overview</h2>
+ * QueryTable implementation that works with the JEXL grammar. This QueryTable
+ * uses the metadata, global index, and partitioned table to return
+ * results based on the query. Example queries:
+ *
+ * <b>Single Term Query</b>
+ * 'foo' - looks in global index for foo, and if any entries are found, then the query
+ * is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed
+ * down the optimized query path which uses the intersecting iterators on the shard
+ * table.
+ *
+ * <b>Boolean expression</b>
+ * field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number),
+ * the query is parsed and the set of eventFields in the query that are indexed is determined by
+ * querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the
+ * eventFields that are indexed, the query may be sent down the optimized path or the full scan path.
+ *
+ * We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators:
+ *
+ * ==, !=, >, ≥, <, ≤, =~, and !~
+ *
+ * Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction
+ * with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An
+ * example using this function is : "f:between(LATITUDE,60.0, 70.0)"
+ *
+ * <h2>Constraints on Query Structure</h2>
+ * Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are
+ * rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. Users
+ * should also be aware that the literals used in the query need to match the data in the table. If an error occurs in the evaluation
+ * we are skipping the event.
+ *
+ * <h2>Notes on Optimization</h2>
+ * Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table:
+ *
+ * 1. An 'or' conjunction exists in the query but not all of the terms are indexed.
+ * 2. No indexed terms exist in the query
+ * 3. An unsupported operator exists in the query
+ *
+ * </pre>
+ *
+ */
+public class QueryLogic extends AbstractQueryLogic {
+
+ protected static Logger log = Logger.getLogger(QueryLogic.class);
+
+ private static String startPartition = "0";
+
+ public QueryLogic() {
+ super();
+ }
+
+ @Override
+ protected RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String,Normalizer> indexedTerms,
+ Multimap<String,QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> typeFilter)
+ throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException {
+ RangeCalculator calc = new RangeCalculator();
+ calc.execute(c, auths, indexedTerms, terms, queryString, this, typeFilter);
+ return calc;
+ }
+
+ protected Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String,QueryTerm> terms) {
+ String startKey = startPartition;
+ String endKey = Integer.toString(this.getNumPartitions());
+ Range r = new Range(startKey, true, endKey, false);
+ return Collections.singletonList(r);
+ }
+
+ @Override
+ protected IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> typeFilter) throws TableNotFoundException {
+ final String dummyTermName = "DUMMY";
+ UnionIndexRanges indexRanges = new UnionIndexRanges();
+
+ // The entries in the index are normalized, since we don't have a field, just try using the LcNoDiacriticsNormalizer.
+ String normalizedFieldValue = new LcNoDiacriticsNormalizer().normalizeFieldValue("", value);
+ // Remove the begin and end ' marks
+ if (normalizedFieldValue.startsWith("'") && normalizedFieldValue.endsWith("'")) {
+ normalizedFieldValue = normalizedFieldValue.substring(1, normalizedFieldValue.length() - 1);
+ }
+ Text fieldValue = new Text(normalizedFieldValue);
+ if (log.isDebugEnabled()) {
+ log.debug("Querying index table : " + this.getIndexTableName() + " for normalized indexed term: " + fieldValue);
+ }
+ Scanner scanner = c.createScanner(this.getIndexTableName(), auths);
+ Range r = new Range(fieldValue);
+ scanner.setRange(r);
+ if (log.isDebugEnabled()) {
+ log.debug("Range for index query: " + r.toString());
+ }
+ for (Entry<Key,Value> entry : scanner) {
+ if (log.isDebugEnabled()) {
+ log.debug("Index entry: " + entry.getKey().toString());
+ }
+ // Get the shard id and datatype from the colq
+ String fieldName = entry.getKey().getColumnFamily().toString();
+ String colq = entry.getKey().getColumnQualifier().toString();
+ int separator = colq.indexOf(EvaluatingIterator.NULL_BYTE_STRING);
+ String shardId = null;
+ String datatype = null;
+ if (separator != -1) {
+ shardId = colq.substring(0, separator);
+ datatype = colq.substring(separator + 1);
+ } else {
+ shardId = colq;
+ }
+ // Skip this entry if the type is not correct
+ if (null != datatype && null != typeFilter && !typeFilter.contains(datatype))
+ continue;
+ // Parse the UID.List object from the value
+ Uid.List uidList = null;
+ try {
+ uidList = Uid.List.parseFrom(entry.getValue().get());
+ } catch (InvalidProtocolBufferException e) {
+ // Don't add UID information, at least we know what shards
+ // it is located in.
+ }
+
+ // Add the count for this shard to the total count for the term.
+ long count = 0;
+ Long storedCount = indexRanges.getTermCardinality().get(dummyTermName);
+ if (null == storedCount) {
+ count = uidList.getCOUNT();
+ } else {
+ count = uidList.getCOUNT() + storedCount;
+ }
+ indexRanges.getTermCardinality().put(dummyTermName, count);
+ // Add the field name
+ indexRanges.getFieldNamesAndValues().put(fieldName, normalizedFieldValue);
+
+ // Create the keys
+ Text shard = new Text(shardId);
+ if (uidList.getIGNORE()) {
+ // Then we create a scan range that is the entire shard
+ indexRanges.add(dummyTermName, new Range(shard));
+ } else {
+ // We should have UUIDs, create event ranges
+ for (String uuid : uidList.getUIDList()) {
+ Text cf = new Text(datatype);
+ TextUtil.textAppend(cf, uuid);
+ Key startKey = new Key(shard, cf);
+ Key endKey = new Key(shard, new Text(cf.toString() + EvaluatingIterator.NULL_BYTE_STRING));
+ Range eventRange = new Range(startKey, true, endKey, false);
+ indexRanges.add(dummyTermName, eventRange);
+ }
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Found " + indexRanges.getRanges().size() + " entries in the index for field value: " + normalizedFieldValue);
+ }
+ return indexRanges;
+
+ }
+
+}
Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/QueryLogic.java
------------------------------------------------------------------------------
svn:eol-style = native