You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/01/06 23:02:13 UTC

svn commit: r1228459 [9/13] - in /incubator/accumulo/branches/1.4: ./ contrib/accumulo_sample/ src/examples/src/main/java/org/apache/accumulo/examples/wikisearch/ src/trace/ src/wikisearch/ src/wikisearch/ingest/ src/wikisearch/ingest/bin/ src/wikisear...

Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/ReadAheadIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/ReadAheadIterator.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/ReadAheadIterator.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/ReadAheadIterator.java Fri Jan  6 22:02:09 2012
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.iterator;
+
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.OptionDescriber;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * This iterator takes the source iterator (the one below it in the iterator stack) and puts it in a background thread. The background thread continues
+ * processing and fills a queue with the Keys and Values from the source iterator. When seek() is called on this iterator, it pauses the background thread,
+ * clears the queue, calls seek() on the source iterator, then resumes the thread filling the queue.
+ * 
+ * Users of this iterator can set the queue size, default is five elements. Users must be aware of the potential for OutOfMemory errors when using this iterator
+ * with large queue sizes or large objects. This iterator copies the Key and Value from the source iterator and puts them into the queue.
+ * 
+ * This iterator introduces some parallelism into the server side iterator stack. One use case for this would be when an iterator takes a relatively long time
+ * to process each K,V pair and causes the iterators above it to wait. By putting the longer running iterator in a background thread we should be able to
+ * achieve greater throughput.
+ * 
+ * NOTE: Experimental!
+ * 
+ */
+public class ReadAheadIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+  
+  private static Logger log = Logger.getLogger(ReadAheadIterator.class);
+  
+  public static final String QUEUE_SIZE = "queue.size";
+  
+  public static final String TIMEOUT = "timeout";
+  
+  private static final QueueElement noMoreDataElement = new QueueElement();
+  
+  private int queueSize = 5;
+  
+  private int timeout = 60;
+  
+  /**
+   * 
+   * Class to hold key and value from the producing thread.
+   * 
+   */
+  static class QueueElement {
+    Key key = null;
+    Value value = null;
+    
+    public QueueElement() {}
+    
+    public QueueElement(Key key, Value value) {
+      super();
+      this.key = new Key(key);
+      this.value = new Value(value.get(), true);
+    }
+    
+    public Key getKey() {
+      return key;
+    }
+    
+    public Value getValue() {
+      return value;
+    }
+  }
+  
+  /**
+   * 
+   * Thread that produces data from the source iterator and places the results in a queue.
+   * 
+   */
+  class ProducerThread extends ReentrantLock implements Runnable {
+    
+    private static final long serialVersionUID = 1L;
+    
+    private Exception e = null;
+    
+    private int waitTime = timeout;
+    
+    private SortedKeyValueIterator<Key,Value> sourceIter = null;
+    
+    public ProducerThread(SortedKeyValueIterator<Key,Value> source) {
+      this.sourceIter = source;
+    }
+    
+    public void run() {
+      boolean hasMoreData = true;
+      // Keep this thread running while there is more data to read
+      // and items left in the queue to be read off.
+      while (hasMoreData || queue.size() > 0) {
+        try {
+          // Acquire the lock, this will wait if the lock is being
+          // held by the ReadAheadIterator.seek() method.
+          this.lock();
+          // Check to see if there is more data from the iterator below.
+          hasMoreData = sourceIter.hasTop();
+          // Break out of the loop if no more data.
+          if (!hasMoreData)
+            continue;
+          // Put the next K,V onto the queue.
+          try {
+            QueueElement e = new QueueElement(sourceIter.getTopKey(), sourceIter.getTopValue());
+            boolean inserted = false;
+            try {
+              inserted = queue.offer(e, this.waitTime, TimeUnit.SECONDS);
+            } catch (InterruptedException ie) {
+              this.e = ie;
+              break;
+            }
+            if (!inserted) {
+              // Then we either got a timeout, set the error and break out of the loop
+              this.e = new TimeoutException("Background thread has exceeded wait time of " + this.waitTime + " seconds, aborting...");
+              break;
+            }
+            // Move the iterator to the next K,V for the next iteration of this loop
+            sourceIter.next();
+          } catch (Exception e) {
+            this.e = e;
+            log.error("Error calling next on source iterator", e);
+            break;
+          }
+        } finally {
+          this.unlock();
+        }
+      }
+      // If we broke out of the loop because of an error, then don't put the marker on the queue, just to do end.
+      if (!hasError()) {
+        // Put the special end of data marker into the queue
+        try {
+          queue.put(noMoreDataElement);
+        } catch (InterruptedException e) {
+          this.e = e;
+          log.error("Error putting End of Data marker onto queue");
+        }
+      }
+    }
+    
+    public boolean hasError() {
+      return (this.e != null);
+    }
+    
+    public Exception getError() {
+      return this.e;
+    }
+  }
+  
+  private SortedKeyValueIterator<Key,Value> source;
+  private ArrayBlockingQueue<QueueElement> queue = null;
+  private QueueElement currentElement = new QueueElement();
+  private ProducerThread thread = null;
+  private Thread t = null;
+  
+  protected ReadAheadIterator(ReadAheadIterator other, IteratorEnvironment env) {
+    source = other.source.deepCopy(env);
+  }
+  
+  public ReadAheadIterator() {}
+  
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new ReadAheadIterator(this, env);
+  }
+  
+  public Key getTopKey() {
+    return currentElement.getKey();
+  }
+  
+  public Value getTopValue() {
+    return currentElement.getValue();
+  }
+  
+  public boolean hasTop() {
+    if (currentElement == noMoreDataElement)
+      return false;
+    return currentElement != null || queue.size() > 0 || source.hasTop();
+  }
+  
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    validateOptions(options);
+    this.source = source;
+    queue = new ArrayBlockingQueue<QueueElement>(queueSize);
+    thread = new ProducerThread(this.source);
+    t = new Thread(thread, "ReadAheadIterator-SourceThread");
+    t.start();
+  }
+  
+  /**
+   * Populate the key and value
+   */
+  public void next() throws IOException {
+    // Thread startup race condition, need to make sure that the
+    // thread has started before we call this the first time.
+    while (t.getState().equals(State.NEW)) {
+      try {
+        Thread.sleep(1);
+      } catch (InterruptedException e) {}
+    }
+    
+    if (t.getState().equals(State.TERMINATED)) {
+      // Thread encountered an error.
+      if (thread.hasError()) {
+        // and it should
+        throw new IOException("Background thread has died", thread.getError());
+      }
+    }
+    
+    // Pull an element off the queue, this will wait if there is no data yet.
+    try {
+      if (thread.hasError())
+        throw new IOException("background thread has error", thread.getError());
+      
+      QueueElement nextElement = null;
+      while (null == nextElement) {
+        try {
+          nextElement = queue.poll(1, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          // TODO: Do we need to do anything here?
+        }
+        if (null == nextElement) {
+          // Then we have no data and timed out, check for error condition in the read ahead thread
+          if (thread.hasError()) {
+            throw new IOException("background thread has error", thread.getError());
+          }
+        }
+      }
+      currentElement = nextElement;
+    } catch (IOException e) {
+      throw new IOException("Error getting element from source iterator", e);
+    }
+  }
+  
+  /**
+   * Seek to the next matching cell and call next to populate the key and value.
+   */
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    if (t.isAlive()) {
+      // Check for error
+      if (thread.hasError())
+        throw new IOException("background thread has error", thread.getError());
+      
+      try {
+        // Acquire the lock, or wait until its unlocked by the producer thread.
+        thread.lock();
+        queue.clear();
+        currentElement = null;
+        source.seek(range, columnFamilies, inclusive);
+      } finally {
+        thread.unlock();
+      }
+      next();
+    } else {
+      throw new IOException("source iterator thread has died.");
+    }
+  }
+  
+  public IteratorOptions describeOptions() {
+    Map<String,String> options = new HashMap<String,String>();
+    options.put(QUEUE_SIZE, "read ahead queue size");
+    options.put(TIMEOUT, "timeout in seconds before background thread thinks that the client has aborted");
+    return new IteratorOptions(getClass().getSimpleName(), "Iterator that puts the source in another thread", options, null);
+  }
+  
+  public boolean validateOptions(Map<String,String> options) {
+    if (options.containsKey(QUEUE_SIZE))
+      queueSize = Integer.parseInt(options.get(QUEUE_SIZE));
+    if (options.containsKey(TIMEOUT))
+      timeout = Integer.parseInt(options.get(TIMEOUT));
+    return true;
+  }
+  
+}

Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/ReadAheadIterator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/UniqFieldNameValueIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/UniqFieldNameValueIterator.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/UniqFieldNameValueIterator.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/UniqFieldNameValueIterator.java Fri Jan  6 22:02:09 2012
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.iterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.wikisearch.util.FieldIndexKeyParser;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+
+public class UniqFieldNameValueIterator extends WrappingIterator {
+  
+  protected static final Logger log = Logger.getLogger(UniqFieldNameValueIterator.class);
+  // Wrapping iterator only accesses its private source in setSource and getSource
+  // Since this class overrides these methods, it's safest to keep the source declaration here
+  private SortedKeyValueIterator<Key,Value> source;
+  private FieldIndexKeyParser keyParser;
+  private Key topKey = null;
+  private Value topValue = null;
+  private Range overallRange = null;
+  private Range currentSubRange;
+  private Text fieldName = null;
+  private Text fieldValueLowerBound = null;
+  private Text fieldValueUpperBound = null;
+  private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
+  private static final String ONE_BYTE = "\1";
+  private boolean multiRow = false;
+  private boolean seekInclusive = false;
+  
+  // -------------------------------------------------------------------------
+  // ------------- Static Methods
+  public static void setLogLevel(Level l) {
+    log.setLevel(l);
+  }
+  
+  // -------------------------------------------------------------------------
+  // ------------- Constructors
+  public UniqFieldNameValueIterator(Text fName, Text fValLower, Text fValUpper) {
+    this.fieldName = fName;
+    this.fieldValueLowerBound = fValLower;
+    this.fieldValueUpperBound = fValUpper;
+    keyParser = createDefaultKeyParser();
+    
+  }
+  
+  public UniqFieldNameValueIterator(UniqFieldNameValueIterator other, IteratorEnvironment env) {
+    source = other.getSource().deepCopy(env);
+    // Set a default KeyParser
+    keyParser = createDefaultKeyParser();
+  }
+  
+  // -------------------------------------------------------------------------
+  // ------------- Overrides
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    source = super.getSource();
+  }
+  
+  @Override
+  protected void setSource(SortedKeyValueIterator<Key,Value> source) {
+    this.source = source;
+  }
+  
+  @Override
+  protected SortedKeyValueIterator<Key,Value> getSource() {
+    return source;
+  }
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new UniqFieldNameValueIterator(this, env);
+  }
+  
+  @Override
+  public Key getTopKey() {
+    return this.topKey;
+  }
+  
+  @Override
+  public Value getTopValue() {
+    return this.topValue;
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return (topKey != null);
+  }
+  
+  @Override
+  public void next() throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("next()");
+    }
+    if (!source.hasTop()) {
+      topKey = null;
+      topValue = null;
+      return;
+    }
+    
+    Key currentKey = topKey;
+    keyParser.parse(topKey);
+    String fValue = keyParser.getFieldValue();
+    
+    Text currentRow = currentKey.getRow();
+    Text currentFam = currentKey.getColumnFamily();
+    
+    if (overallRange.getEndKey() != null && overallRange.getEndKey().getRow().compareTo(currentRow) < 0) {
+      if (log.isDebugEnabled()) {
+        log.debug("next, overall endRow: " + overallRange.getEndKey().getRow() + "  currentRow: " + currentRow);
+      }
+      topKey = null;
+      topValue = null;
+      return;
+    }
+    
+    if (fValue.compareTo(this.fieldValueUpperBound.toString()) > 0) {
+      topKey = null;
+      topValue = null;
+      return;
+    }
+    Key followingKey = new Key(currentKey.getRow(), this.fieldName, new Text(fValue + ONE_BYTE));
+    if (log.isDebugEnabled()) {
+      log.debug("next, followingKey to seek on: " + followingKey);
+    }
+    Range r = new Range(followingKey, followingKey);
+    source.seek(r, EMPTY_COL_FAMS, false);
+    while (true) {
+      if (!source.hasTop()) {
+        topKey = null;
+        topValue = null;
+        return;
+      }
+      
+      Key k = source.getTopKey();
+      if (!overallRange.contains(k)) {
+        topKey = null;
+        topValue = null;
+        return;
+      }
+      if (log.isDebugEnabled()) {
+        log.debug("next(), key: " + k + " subrange: " + this.currentSubRange);
+      }
+      // if (this.currentSubRange.contains(k)) {
+      keyParser.parse(k);
+      Text currentVal = new Text(keyParser.getFieldValue());
+      if (k.getRow().equals(currentRow) && k.getColumnFamily().equals(currentFam) && currentVal.compareTo(fieldValueUpperBound) <= 0) {
+        topKey = k;
+        topValue = source.getTopValue();
+        return;
+        
+      } else { // need to move to next row.
+        if (this.overallRange.contains(k) && this.multiRow) {
+          // need to find the next sub range
+          // STEPS
+          // 1. check if you moved past your current row on last call to next
+          // 2. figure out next row
+          // 3. build new start key with lowerbound fvalue
+          // 4. seek the source
+          // 5. test the subrange.
+          if (k.getRow().equals(currentRow)) {
+            // get next row
+            currentRow = getNextRow();
+            if (currentRow == null) {
+              topKey = null;
+              topValue = null;
+              return;
+            }
+          } else {
+            // i'm already in the next row
+            currentRow = source.getTopKey().getRow();
+          }
+          
+          // build new startKey
+          Key sKey = new Key(currentRow, fieldName, fieldValueLowerBound);
+          Key eKey = new Key(currentRow, fieldName, fieldValueUpperBound);
+          currentSubRange = new Range(sKey, eKey);
+          source.seek(currentSubRange, EMPTY_COL_FAMS, seekInclusive);
+          
+        } else { // not multi-row or outside overall range, we're done
+          topKey = null;
+          topValue = null;
+          return;
+        }
+      }
+      
+    }
+    
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("seek, range: " + range);
+    }
+    this.overallRange = range;
+    this.seekInclusive = inclusive;
+    source.seek(range, EMPTY_COL_FAMS, inclusive);
+    topKey = null;
+    topValue = null;
+    Key sKey;
+    Key eKey;
+    
+    if (range.isInfiniteStartKey()) {
+      sKey = source.getTopKey();
+      if (sKey == null) {
+        return;
+      }
+    } else {
+      sKey = range.getStartKey();
+    }
+    
+    if (range.isInfiniteStopKey()) {
+      eKey = null;
+      this.multiRow = true; // assume we will go to the end of the tablet.
+    } else {
+      eKey = range.getEndKey();
+      if (sKey.getRow().equals(eKey.getRow())) {
+        this.multiRow = false;
+      } else {
+        this.multiRow = true;
+      }
+    }
+    
+    if (log.isDebugEnabled()) {
+      log.debug("seek, multiRow:" + multiRow + " range:" + range);
+    }
+    
+    /*
+     * NOTE: If the seek range spans multiple rows, we are only interested in the fieldName:fieldValue subranges in each row. Keys will exist in the
+     * overallRange that we will want to skip over so we need to create subranges per row so we don't have to examine every key in between.
+     */
+    
+    Text sRow = sKey.getRow();
+    Key ssKey = new Key(sRow, this.fieldName, this.fieldValueLowerBound);
+    Key eeKey = new Key(sRow, this.fieldName, this.fieldValueUpperBound);
+    this.currentSubRange = new Range(ssKey, eeKey);
+    
+    if (log.isDebugEnabled()) {
+      log.debug("seek, currentSubRange: " + currentSubRange);
+    }
+    source.seek(this.currentSubRange, columnFamilies, inclusive);
+    // cycle until we find a valid topKey, or we get ejected b/c we hit the
+    // end of the tablet or exceeded the overallRange.
+    while (topKey == null) {
+      if (source.hasTop()) {
+        Key k = source.getTopKey();
+        if (log.isDebugEnabled()) {
+          log.debug("seek, source.topKey: " + k);
+        }
+        if (currentSubRange.contains(k)) {
+          topKey = k;
+          topValue = source.getTopValue();
+          
+          if (log.isDebugEnabled()) {
+            log.debug("seek, source has top in valid range");
+          }
+          
+        } else { // outside of subRange.
+          // if multiRow mode, get the next row and seek to it
+          if (multiRow && overallRange.contains(k)) {
+            
+            Key fKey = sKey.followingKey(PartialKey.ROW);
+            Range fRange = new Range(fKey, eKey);
+            source.seek(fRange, columnFamilies, inclusive);
+            
+            if (source.hasTop()) {
+              Text row = source.getTopKey().getRow();
+              Key nKey = new Key(row, this.fieldName, this.fieldValueLowerBound);
+              this.currentSubRange = new Range(nKey, eKey);
+              sKey = this.currentSubRange.getStartKey();
+              Range nextRange = new Range(sKey, eKey);
+              source.seek(nextRange, columnFamilies, inclusive);
+            } else {
+              topKey = null;
+              topValue = null;
+              return;
+            }
+            
+          } else { // not multi row & outside range, we're done.
+            topKey = null;
+            topValue = null;
+            return;
+          }
+        }
+      } else { // source does not have top, we're done
+        topKey = null;
+        topValue = null;
+        return;
+      }
+    }
+    
+  }
+  
+  // -------------------------------------------------------------------------
+  // ------------- Internal Methods
+  private FieldIndexKeyParser createDefaultKeyParser() {
+    FieldIndexKeyParser parser = new FieldIndexKeyParser();
+    return parser;
+  }
+  
+  private Text getNextRow() throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("getNextRow()");
+    }
+    Key fakeKey = new Key(source.getTopKey().followingKey(PartialKey.ROW));
+    Range fakeRange = new Range(fakeKey, fakeKey);
+    source.seek(fakeRange, EMPTY_COL_FAMS, false);
+    if (source.hasTop()) {
+      return source.getTopKey().getRow();
+    } else {
+      return null;
+    }
+  }
+}

Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/iterator/UniqFieldNameValueIterator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/jexl/Arithmetic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/jexl/Arithmetic.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/jexl/Arithmetic.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/jexl/Arithmetic.java Fri Jan  6 22:02:09 2012
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.jexl;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.jexl2.JexlArithmetic;
+import org.apache.commons.lang.math.NumberUtils;
+
+public class Arithmetic extends JexlArithmetic {
+  
+  public Arithmetic(boolean lenient) {
+    super(lenient);
+  }
+  
+  /**
+   * This method differs from the parent in that we are not calling String.matches() because it does not match on a newline. Instead we are handling this case.
+   * 
+   * @param left
+   *          first value
+   * @param right
+   *          second value
+   * @return test result.
+   */
+  @Override
+  public boolean matches(Object left, Object right) {
+    if (left == null && right == null) {
+      // if both are null L == R
+      return true;
+    }
+    if (left == null || right == null) {
+      // we know both aren't null, therefore L != R
+      return false;
+    }
+    final String arg = left.toString();
+    if (right instanceof java.util.regex.Pattern) {
+      return ((java.util.regex.Pattern) right).matcher(arg).matches();
+    } else {
+      // return arg.matches(right.toString());
+      Pattern p = Pattern.compile(right.toString(), Pattern.DOTALL);
+      Matcher m = p.matcher(arg);
+      return m.matches();
+      
+    }
+  }
+  
+  /**
+   * This method differs from the parent class in that we are going to try and do a better job of coercing the types. As a last resort we will do a string
+   * comparison and try not to throw a NumberFormatException. The JexlArithmetic class performs coercion to a particular type if either the left or the right
+   * match a known type. We will look at the type of the right operator and try to make the left of the same type.
+   */
+  @Override
+  public boolean equals(Object left, Object right) {
+    Object fixedLeft = fixLeft(left, right);
+    return super.equals(fixedLeft, right);
+  }
+  
+  @Override
+  public boolean lessThan(Object left, Object right) {
+    Object fixedLeft = fixLeft(left, right);
+    return super.lessThan(fixedLeft, right);
+  }
+  
+  protected Object fixLeft(Object left, Object right) {
+    
+    if (null == left || null == right)
+      return left;
+    
+    if (!(right instanceof Number) && left instanceof Number) {
+      right = NumberUtils.createNumber(right.toString());
+    }
+    
+    if (right instanceof Number && left instanceof Number) {
+      if (right instanceof Double)
+        return ((Double) right).doubleValue();
+      else if (right instanceof Float)
+        return ((Float) right).floatValue();
+      else if (right instanceof Long)
+        return ((Long) right).longValue();
+      else if (right instanceof Integer)
+        return ((Integer) right).intValue();
+      else if (right instanceof Short)
+        return ((Short) right).shortValue();
+      else if (right instanceof Byte)
+        return ((Byte) right).byteValue();
+      else
+        return right;
+    }
+    if (right instanceof Number && left instanceof String) {
+      Number num = NumberUtils.createNumber(left.toString());
+      // Let's try to cast left as right's type.
+      if (this.isFloatingPointNumber(right) && this.isFloatingPointNumber(left))
+        return num;
+      else if (this.isFloatingPointNumber(right))
+        return num.doubleValue();
+      else if (right instanceof Number)
+        return num.longValue();
+    } else if (right instanceof Boolean && left instanceof String) {
+      if (left.equals("true") || left.equals("false"))
+        return Boolean.parseBoolean(left.toString());
+      
+      Number num = NumberUtils.createNumber(left.toString());
+      if (num.intValue() == 1)
+        return (Boolean) true;
+      else if (num.intValue() == 0)
+        return (Boolean) false;
+    }
+    return left;
+  }
+  
+}

Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/jexl/Arithmetic.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/AbstractQueryLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/AbstractQueryLogic.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/AbstractQueryLogic.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/AbstractQueryLogic.java Fri Jan  6 22:02:09 2012
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.logic;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.wikisearch.ingest.WikipediaMapper;
+import org.apache.accumulo.wikisearch.iterator.BooleanLogicIterator;
+import org.apache.accumulo.wikisearch.iterator.EvaluatingIterator;
+import org.apache.accumulo.wikisearch.iterator.OptimizedQueryIterator;
+import org.apache.accumulo.wikisearch.iterator.ReadAheadIterator;
+import org.apache.accumulo.wikisearch.normalizer.LcNoDiacriticsNormalizer;
+import org.apache.accumulo.wikisearch.normalizer.Normalizer;
+import org.apache.accumulo.wikisearch.parser.EventFields;
+import org.apache.accumulo.wikisearch.parser.EventFields.FieldValue;
+import org.apache.accumulo.wikisearch.parser.FieldIndexQueryReWriter;
+import org.apache.accumulo.wikisearch.parser.JexlOperatorConstants;
+import org.apache.accumulo.wikisearch.parser.QueryParser;
+import org.apache.accumulo.wikisearch.parser.QueryParser.QueryTerm;
+import org.apache.accumulo.wikisearch.parser.RangeCalculator;
+import org.apache.accumulo.wikisearch.sample.Document;
+import org.apache.accumulo.wikisearch.sample.Field;
+import org.apache.accumulo.wikisearch.sample.Results;
+import org.apache.commons.jexl2.parser.ParserTreeConstants;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * <pre>
+ * <h2>Overview</h2>
+ * Query implementation that works with the JEXL grammar. This 
+ * uses the metadata, global index, and partitioned table to return
+ * results based on the query. Example queries:
+ * 
+ *  <b>Single Term Query</b>
+ *  'foo' - looks in global index for foo, and if any entries are found, then the query
+ *          is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed
+ *          down the optimized query path which uses the intersecting iterators on the partitioned
+ *          table.
+ * 
+ *  <b>Boolean expression</b>        
+ *  field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number),
+ *                   the query is parsed and the set of eventFields in the query that are indexed is determined by
+ *                   querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the
+ *                   eventFields that are indexed, the query may be sent down the optimized path or the full scan path.
+ * 
+ *  We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators:
+ * 
+ *  ==, !=, &gt;, &ge;, &lt;, &le;, =~, and !~
+ * 
+ *  Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction
+ *  with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An
+ *  example using this function is : "f:between(LATITUDE,60.0, 70.0)"
+ * 
+ *  <h2>Constraints on Query Structure</h2>
+ *  Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are
+ *  rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. If 
+ *  an error occurs in the evaluation we are skipping the event.
+ * 
+ *  <h2>Notes on Optimization</h2>
+ *  Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table:
+ * 
+ *  1. An 'or' conjunction exists in the query but not all of the terms are indexed.
+ *  2. No indexed terms exist in the query
+ *  3. An unsupported operator exists in the query
+ * 
+ * </pre>
+ * 
+ */
+public abstract class AbstractQueryLogic {
+  
+  protected static Logger log = Logger.getLogger(AbstractQueryLogic.class);
+  
+  /**
+   * Set of datatypes to limit the query to.
+   */
+  public static final String DATATYPE_FILTER_SET = "datatype.filter.set";
+  
+  private static class DoNotPerformOptimizedQueryException extends Exception {
+    private static final long serialVersionUID = 1L;
+  }
+  
+  /**
+   * Object that is used to hold ranges found in the index. Subclasses may compute the final range set in various ways.
+   */
+  public static abstract class IndexRanges {
+    
+    private Map<String,String> indexValuesToOriginalValues = null;
+    private Multimap<String,String> fieldNamesAndValues = HashMultimap.create();
+    private Map<String,Long> termCardinality = new HashMap<String,Long>();
+    protected Map<String,TreeSet<Range>> ranges = new HashMap<String,TreeSet<Range>>();
+    
+    public Multimap<String,String> getFieldNamesAndValues() {
+      return fieldNamesAndValues;
+    }
+    
+    public void setFieldNamesAndValues(Multimap<String,String> fieldNamesAndValues) {
+      this.fieldNamesAndValues = fieldNamesAndValues;
+    }
+    
+    public final Map<String,Long> getTermCardinality() {
+      return termCardinality;
+    }
+    
+    public Map<String,String> getIndexValuesToOriginalValues() {
+      return indexValuesToOriginalValues;
+    }
+    
+    public void setIndexValuesToOriginalValues(Map<String,String> indexValuesToOriginalValues) {
+      this.indexValuesToOriginalValues = indexValuesToOriginalValues;
+    }
+    
+    public abstract void add(String term, Range r);
+    
+    public abstract Set<Range> getRanges();
+  }
+  
+  /**
+   * Object that computes the ranges by unioning all of the ranges for all of the terms together. In the case where ranges overlap, the largest range is used.
+   */
+  public static class UnionIndexRanges extends IndexRanges {
+    
+    public static String DEFAULT_KEY = "default";
+    
+    public UnionIndexRanges() {
+      this.ranges.put(DEFAULT_KEY, new TreeSet<Range>());
+    }
+    
+    public Set<Range> getRanges() {
+      // So the set of ranges is ordered. It *should* be the case that
+      // ranges with partition ids will sort before ranges that point to
+      // a specific event. Populate a new set of ranges but don't add a
+      // range for an event where that range is contained in a range already
+      // added.
+      Set<Text> shardsAdded = new HashSet<Text>();
+      Set<Range> returnSet = new HashSet<Range>();
+      for (Range r : ranges.get(DEFAULT_KEY)) {
+        if (!shardsAdded.contains(r.getStartKey().getRow())) {
+          // Only add ranges with a start key for the entire partition.
+          if (r.getStartKey().getColumnFamily() == null) {
+            shardsAdded.add(r.getStartKey().getRow());
+          }
+          returnSet.add(r);
+        } else {
+          // if (log.isTraceEnabled())
+          log.info("Skipping event specific range: " + r.toString() + " because range has already been added: "
+              + shardsAdded.contains(r.getStartKey().getRow()));
+        }
+      }
+      return returnSet;
+    }
+    
+    public void add(String term, Range r) {
+      ranges.get(DEFAULT_KEY).add(r);
+    }
+  }
+  
+  private String metadataTableName;
+  private String indexTableName;
+  private String reverseIndexTableName;
+  private String tableName;
+  private int queryThreads = 8;
+  private String readAheadQueueSize;
+  private String readAheadTimeOut;
+  private boolean useReadAheadIterator;
+  private Kryo kryo = new Kryo();
+  private EventFields eventFields = new EventFields();
+  private List<String> unevaluatedFields = null;
+  private int numPartitions = 0;
+  private Map<Class<? extends Normalizer>,Normalizer> normalizerCacheMap = new HashMap<Class<? extends Normalizer>,Normalizer>();
+  private static final String NULL_BYTE = "\u0000";
+  
+  public AbstractQueryLogic() {
+    super();
+    EventFields.initializeKryo(kryo);
+  }
+  
+  /**
+   * Queries metadata table to determine which terms are indexed.
+   * 
+   * @param c
+   * @param auths
+   * @param queryLiterals
+   * @param begin
+   * @param end
+   * @param datatypes
+   *          - optional list of types
+   * @return map of indexed field names to types to normalizers used in this date range
+   * @throws TableNotFoundException
+   * @throws IllegalAccessException
+   * @throws InstantiationException
+   */
+  protected Map<String,Multimap<String,Class<? extends Normalizer>>> findIndexedTerms(Connector c, Authorizations auths, Set<String> queryLiterals,
+      Set<String> datatypes) throws TableNotFoundException, InstantiationException, IllegalAccessException {
+    
+    Map<String,Multimap<String,Class<? extends Normalizer>>> results = new HashMap<String,Multimap<String,Class<? extends Normalizer>>>();
+    
+    for (String literal : queryLiterals) {
+      if (log.isDebugEnabled())
+        log.debug("Querying " + this.getMetadataTableName() + " table for " + literal);
+      Range range = new Range(literal.toUpperCase());
+      Scanner scanner = c.createScanner(this.getMetadataTableName(), auths);
+      scanner.setRange(range);
+      scanner.fetchColumnFamily(new Text(WikipediaMapper.METADATA_INDEX_COLUMN_FAMILY));
+      for (Entry<Key,Value> entry : scanner) {
+        if (!results.containsKey(literal)) {
+          Multimap<String,Class<? extends Normalizer>> m = HashMultimap.create();
+          results.put(literal, m);
+        }
+        // Get the column qualifier from the key. It contains the datatype and normalizer class
+        String colq = entry.getKey().getColumnQualifier().toString();
+        if (null != colq && colq.contains("\0")) {
+          int idx = colq.indexOf("\0");
+          if (idx != -1) {
+            String type = colq.substring(0, idx);
+            // If types are specified and this type is not in the list then skip it.
+            if (null != datatypes && !datatypes.contains(type))
+              continue;
+            try {
+              @SuppressWarnings("unchecked")
+              Class<? extends Normalizer> clazz = (Class<? extends Normalizer>) Class.forName(colq.substring(idx + 1));
+              if (!normalizerCacheMap.containsKey(clazz))
+                normalizerCacheMap.put(clazz, clazz.newInstance());
+              results.get(literal).put(type, clazz);
+            } catch (ClassNotFoundException e) {
+              log.error("Unable to find normalizer on class path: " + colq.substring(idx + 1), e);
+              results.get(literal).put(type, LcNoDiacriticsNormalizer.class);
+            }
+          } else {
+            log.warn("EventMetadata entry did not contain NULL byte: " + entry.getKey().toString());
+          }
+        } else {
+          log.warn("ColumnQualifier null in EventMetadata for key: " + entry.getKey().toString());
+        }
+      }
+    }
+    if (log.isDebugEnabled())
+      log.debug("METADATA RESULTS: " + results.toString());
+    return results;
+  }
+  
+  /**
+   * Performs a lookup in the global index for a single non-fielded term.
+   * 
+   * @param c
+   * @param auths
+   * @param value
+   * @param begin
+   * @param end
+   * @param datatypes
+   *          - optional list of types
+   * @return ranges that fit into the date range.
+   */
+  protected abstract IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> datatypes) throws TableNotFoundException;
+  
+  /**
+   * Performs a lookup in the global index / reverse index and returns a RangeCalculator
+   * 
+   * @param c
+   *          Accumulo connection
+   * @param auths
+   *          authset for queries
+   * @param indexedTerms
+   *          multimap of indexed field name and Normalizers used
+   * @param terms
+   *          multimap of field name and QueryTerm object
+   * @param begin
+   *          query begin date
+   * @param end
+   *          query end date
+   * @param dateFormatter
+   * @param indexTableName
+   * @param reverseIndexTableName
+   * @param queryString
+   *          original query string
+   * @param queryThreads
+   * @param datatypes
+   *          - optional list of types
+   * @return range calculator
+   * @throws TableNotFoundException
+   */
+  protected abstract RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String,Normalizer> indexedTerms,
+      Multimap<String,QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> datatypes)
+      throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException;
+  
+  protected abstract Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String,QueryTerm> terms);
+  
+  public String getMetadataTableName() {
+    return metadataTableName;
+  }
+  
+  public String getIndexTableName() {
+    return indexTableName;
+  }
+  
+  public String getTableName() {
+    return tableName;
+  }
+  
+  public void setMetadataTableName(String metadataTableName) {
+    this.metadataTableName = metadataTableName;
+  }
+  
+  public void setIndexTableName(String indexTableName) {
+    this.indexTableName = indexTableName;
+  }
+  
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+  
+  public int getQueryThreads() {
+    return queryThreads;
+  }
+  
+  public void setQueryThreads(int queryThreads) {
+    this.queryThreads = queryThreads;
+  }
+  
+  public String getReadAheadQueueSize() {
+    return readAheadQueueSize;
+  }
+  
+  public String getReadAheadTimeOut() {
+    return readAheadTimeOut;
+  }
+  
+  public boolean isUseReadAheadIterator() {
+    return useReadAheadIterator;
+  }
+  
+  public void setReadAheadQueueSize(String readAheadQueueSize) {
+    this.readAheadQueueSize = readAheadQueueSize;
+  }
+  
+  public void setReadAheadTimeOut(String readAheadTimeOut) {
+    this.readAheadTimeOut = readAheadTimeOut;
+  }
+  
+  public void setUseReadAheadIterator(boolean useReadAheadIterator) {
+    this.useReadAheadIterator = useReadAheadIterator;
+  }
+  
+  public String getReverseIndexTableName() {
+    return reverseIndexTableName;
+  }
+  
+  public void setReverseIndexTableName(String reverseIndexTableName) {
+    this.reverseIndexTableName = reverseIndexTableName;
+  }
+  
+  public List<String> getUnevaluatedFields() {
+    return unevaluatedFields;
+  }
+  
+  public void setUnevaluatedFields(List<String> unevaluatedFields) {
+    this.unevaluatedFields = unevaluatedFields;
+  }
+  
+  public void setUnevaluatedFields(String unevaluatedFieldList) {
+    this.unevaluatedFields = new ArrayList<String>();
+    for (String field : unevaluatedFieldList.split(","))
+      this.unevaluatedFields.add(field);
+  }
+  
+  public int getNumPartitions() {
+    return numPartitions;
+  }
+  
+  public void setNumPartitions(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+  
+  public Document createDocument(Key key, Value value) {
+    eventFields.clear();
+    ByteBuffer buf = ByteBuffer.wrap(value.get());
+    eventFields.readObjectData(kryo, buf);
+    
+    Document doc = new Document();
+    // Set the id to the document id which is located in the colf
+    String row = key.getRow().toString();
+    String colf = key.getColumnFamily().toString();
+    int idx = colf.indexOf(NULL_BYTE);
+    String type = colf.substring(0, idx);
+    String id = colf.substring(idx + 1);
+    doc.setId(id);
+    for (Entry<String,Collection<FieldValue>> entry : eventFields.asMap().entrySet()) {
+      for (FieldValue fv : entry.getValue()) {
+        Field val = new Field();
+        val.setFieldName(entry.getKey());
+        val.setFieldValue(new String(fv.getValue(), Charset.forName("UTF-8")));
+        doc.getFields().add(val);
+      }
+    }
+    
+    // Add the pointer for the content.
+    Field docPointer = new Field();
+    docPointer.setFieldName("DOCUMENT");
+    docPointer.setFieldValue("DOCUMENT:" + row + "/" + type + "/" + id);
+    doc.getFields().add(docPointer);
+    
+    return doc;
+  }
+  
+  public String getResultsKey(Entry<Key,Value> key) {
+    // Use the colf from the table, it contains the uuid and datatype
+    return key.getKey().getColumnFamily().toString();
+  }
+  
+  public Results runQuery(Connector connector, List<String> authorizations, String query, Date beginDate, Date endDate, Set<String> types) {
+    
+    if (StringUtils.isEmpty(query)) {
+      throw new IllegalArgumentException("NULL QueryNode reference passed to " + this.getClass().getSimpleName());
+    }
+    
+    Set<Range> ranges = new HashSet<Range>();
+    Set<String> typeFilter = types;
+    String array[] = authorizations.toArray(new String[0]);
+    Authorizations auths = new Authorizations(array);
+    Results results = new Results();
+    
+    // Get the query string
+    String queryString = query;
+    
+    StopWatch abstractQueryLogic = new StopWatch();
+    StopWatch optimizedQuery = new StopWatch();
+    StopWatch queryGlobalIndex = new StopWatch();
+    StopWatch optimizedEventQuery = new StopWatch();
+    StopWatch fullScanQuery = new StopWatch();
+    StopWatch processResults = new StopWatch();
+    
+    abstractQueryLogic.start();
+    
+    StopWatch parseQuery = new StopWatch();
+    parseQuery.start();
+    
+    QueryParser parser;
+    try {
+      if (log.isDebugEnabled()) {
+        log.debug("ShardQueryLogic calling QueryParser.execute");
+      }
+      parser = new QueryParser();
+      parser.execute(queryString);
+    } catch (org.apache.commons.jexl2.parser.ParseException e1) {
+      throw new IllegalArgumentException("Error parsing query", e1);
+    }
+    int hash = parser.getHashValue();
+    parseQuery.stop();
+    if (log.isDebugEnabled()) {
+      log.debug(hash + " Query: " + queryString);
+    }
+    
+    Set<String> fields = new HashSet<String>();
+    for (String f : parser.getQueryIdentifiers()) {
+      fields.add(f);
+    }
+    if (log.isDebugEnabled()) {
+      log.debug("getQueryIdentifiers: " + parser.getQueryIdentifiers().toString());
+    }
+    // Remove any negated fields from the fields list, we don't want to lookup negated fields
+    // in the index.
+    fields.removeAll(parser.getNegatedTermsForOptimizer());
+    
+    if (log.isDebugEnabled()) {
+      log.debug("getQueryIdentifiers: " + parser.getQueryIdentifiers().toString());
+    }
+    // Get the mapping of field name to QueryTerm object from the query. The query term object
+    // contains the operator, whether its negated or not, and the literal to test against.
+    Multimap<String,QueryTerm> terms = parser.getQueryTerms();
+    
+    // Find out which terms are indexed
+    // TODO: Should we cache indexed terms or does that not make sense since we are always
+    // loading data.
+    StopWatch queryMetadata = new StopWatch();
+    queryMetadata.start();
+    Map<String,Multimap<String,Class<? extends Normalizer>>> metadataResults;
+    try {
+      metadataResults = findIndexedTerms(connector, auths, fields, typeFilter);
+    } catch (Exception e1) {
+      throw new RuntimeException("Error in metadata lookup", e1);
+    }
+    
+    // Create a map of indexed term to set of normalizers for it
+    Multimap<String,Normalizer> indexedTerms = HashMultimap.create();
+    for (Entry<String,Multimap<String,Class<? extends Normalizer>>> entry : metadataResults.entrySet()) {
+      // Get the normalizer from the normalizer cache
+      for (Class<? extends Normalizer> clazz : entry.getValue().values()) {
+        indexedTerms.put(entry.getKey(), normalizerCacheMap.get(clazz));
+      }
+    }
+    queryMetadata.stop();
+    if (log.isDebugEnabled()) {
+      log.debug(hash + " Indexed Terms: " + indexedTerms.toString());
+    }
+    
+    Set<String> orTerms = parser.getOrTermsForOptimizer();
+    
+    // Iterate over the query terms to get the operators specified in the query.
+    ArrayList<String> unevaluatedExpressions = new ArrayList<String>();
+    boolean unsupportedOperatorSpecified = false;
+    for (Entry<String,QueryTerm> entry : terms.entries()) {
+      if (null == entry.getValue()) {
+        continue;
+      }
+      
+      if (null != this.unevaluatedFields && this.unevaluatedFields.contains(entry.getKey().trim())) {
+        unevaluatedExpressions.add(entry.getKey().trim() + " " + entry.getValue().getOperator() + " " + entry.getValue().getValue());
+      }
+      
+      int operator = JexlOperatorConstants.getJJTNodeType(entry.getValue().getOperator());
+      if (!(operator == ParserTreeConstants.JJTEQNODE || operator == ParserTreeConstants.JJTNENODE || operator == ParserTreeConstants.JJTLENODE
+          || operator == ParserTreeConstants.JJTLTNODE || operator == ParserTreeConstants.JJTGENODE || operator == ParserTreeConstants.JJTGTNODE || operator == ParserTreeConstants.JJTERNODE)) {
+        unsupportedOperatorSpecified = true;
+        break;
+      }
+    }
+    if (null != unevaluatedExpressions)
+      unevaluatedExpressions.trimToSize();
+    if (log.isDebugEnabled()) {
+      log.debug(hash + " unsupportedOperators: " + unsupportedOperatorSpecified + " indexedTerms: " + indexedTerms.toString() + " orTerms: "
+          + orTerms.toString() + " unevaluatedExpressions: " + unevaluatedExpressions.toString());
+    }
+    
+    // We can use the intersecting iterator over the field index as an optimization under the
+    // following conditions
+    //
+    // 1. No unsupported operators in the query.
+    // 2. No 'or' operators and at least one term indexed
+    // or
+    // 1. No unsupported operators in the query.
+    // 2. and all terms indexed
+    // or
+    // 1. All or'd terms are indexed. NOTE, this will potentially skip some queries and push to a full table scan
+    // // WE should look into finding a better way to handle whether we do an optimized query or not.
+    boolean optimizationSucceeded = false;
+    boolean orsAllIndexed = false;
+    if (orTerms.isEmpty()) {
+      orsAllIndexed = false;
+    } else {
+      orsAllIndexed = indexedTerms.keySet().containsAll(orTerms);
+    }
+    
+    if (log.isDebugEnabled()) {
+      log.debug("All or terms are indexed");
+    }
+    
+    if (!unsupportedOperatorSpecified
+        && (((null == orTerms || orTerms.isEmpty()) && indexedTerms.size() > 0) || (fields.size() > 0 && indexedTerms.size() == fields.size()) || orsAllIndexed)) {
+      optimizedQuery.start();
+      // Set up intersecting iterator over field index.
+      
+      // Get information from the global index for the indexed terms. The results object will contain the term
+      // mapped to an object that contains the total count, and partitions where this term is located.
+      
+      // TODO: Should we cache indexed term information or does that not make sense since we are always loading data
+      queryGlobalIndex.start();
+      IndexRanges termIndexInfo;
+      try {
+        // If fields is null or zero, then it's probably the case that the user entered a value
+        // to search for with no fields. Check for the value in index.
+        if (fields.isEmpty()) {
+          termIndexInfo = this.getTermIndexInformation(connector, auths, queryString, typeFilter);
+          if (null != termIndexInfo && termIndexInfo.getRanges().isEmpty()) {
+            // Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards
+            // in unhandled locations.
+            // Break out of here by throwing a named exception and do full scan
+            throw new DoNotPerformOptimizedQueryException();
+          }
+          // We need to rewrite the query string here so that it's valid.
+          if (termIndexInfo instanceof UnionIndexRanges) {
+            UnionIndexRanges union = (UnionIndexRanges) termIndexInfo;
+            StringBuilder buf = new StringBuilder();
+            String sep = "";
+            for (String fieldName : union.getFieldNamesAndValues().keySet()) {
+              buf.append(sep).append(fieldName).append(" == ");
+              if (!(queryString.startsWith("'") && queryString.endsWith("'"))) {
+                buf.append("'").append(queryString).append("'");
+              } else {
+                buf.append(queryString);
+              }
+              sep = " or ";
+            }
+            if (log.isDebugEnabled()) {
+              log.debug("Rewrote query for non-fielded single term query: " + queryString + " to " + buf.toString());
+            }
+            queryString = buf.toString();
+          } else {
+            throw new RuntimeException("Unexpected IndexRanges implementation");
+          }
+        } else {
+          RangeCalculator calc = this.getTermIndexInformation(connector, auths, indexedTerms, terms, this.getIndexTableName(), this.getReverseIndexTableName(),
+              queryString, this.queryThreads, typeFilter);
+          if (null == calc.getResult() || calc.getResult().isEmpty()) {
+            // Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards
+            // in unhandled locations.
+            // Break out of here by throwing a named exception and do full scan
+            throw new DoNotPerformOptimizedQueryException();
+          }
+          termIndexInfo = new UnionIndexRanges();
+          termIndexInfo.setIndexValuesToOriginalValues(calc.getIndexValues());
+          termIndexInfo.setFieldNamesAndValues(calc.getIndexEntries());
+          termIndexInfo.getTermCardinality().putAll(calc.getTermCardinalities());
+          for (Range r : calc.getResult()) {
+            // foo is a placeholder and is ignored.
+            termIndexInfo.add("foo", r);
+          }
+        }
+      } catch (TableNotFoundException e) {
+        log.error(this.getIndexTableName() + "not found", e);
+        throw new RuntimeException(this.getIndexTableName() + "not found", e);
+      } catch (org.apache.commons.jexl2.parser.ParseException e) {
+        throw new RuntimeException("Error determining ranges for query: " + queryString, e);
+      } catch (DoNotPerformOptimizedQueryException e) {
+        log.info("Indexed fields not found in index, performing full scan");
+        termIndexInfo = null;
+      }
+      queryGlobalIndex.stop();
+      
+      // Determine if we should proceed with optimized query based on results from the global index
+      boolean proceed = false;
+      if (null == termIndexInfo || termIndexInfo.getFieldNamesAndValues().values().size() == 0) {
+        proceed = false;
+      } else if (null != orTerms && orTerms.size() > 0 && (termIndexInfo.getFieldNamesAndValues().values().size() == indexedTerms.size())) {
+        proceed = true;
+      } else if (termIndexInfo.getFieldNamesAndValues().values().size() > 0) {
+        proceed = true;
+      } else if (orsAllIndexed) {
+        proceed = true;
+      } else {
+        proceed = false;
+      }
+      if (log.isDebugEnabled()) {
+        log.debug("Proceed with optimized query: " + proceed);
+        if (null != termIndexInfo)
+          log.debug("termIndexInfo.getTermsFound().size(): " + termIndexInfo.getFieldNamesAndValues().values().size() + " indexedTerms.size: "
+              + indexedTerms.size() + " fields.size: " + fields.size());
+      }
+      if (proceed) {
+        
+        if (log.isDebugEnabled()) {
+          log.debug(hash + " Performing optimized query");
+        }
+        // Use the scan ranges from the GlobalIndexRanges object as the ranges for the batch scanner
+        ranges = termIndexInfo.getRanges();
+        if (log.isDebugEnabled()) {
+          log.info(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString());
+        }
+        
+        // Create BatchScanner, set the ranges, and setup the iterators.
+        optimizedEventQuery.start();
+        BatchScanner bs = null;
+        try {
+          bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads);
+          bs.setRanges(ranges);
+          IteratorSetting si = new IteratorSetting(21, "eval", OptimizedQueryIterator.class);
+          
+          if (log.isDebugEnabled()) {
+            log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString);
+          }
+          // Set the query option
+          si.addOption(EvaluatingIterator.QUERY_OPTION, queryString);
+          // Set the Indexed Terms List option. This is the field name and normalized field value pair separated
+          // by a comma.
+          StringBuilder buf = new StringBuilder();
+          String sep = "";
+          for (Entry<String,String> entry : termIndexInfo.getFieldNamesAndValues().entries()) {
+            buf.append(sep);
+            buf.append(entry.getKey());
+            buf.append(":");
+            buf.append(termIndexInfo.getIndexValuesToOriginalValues().get(entry.getValue()));
+            buf.append(":");
+            buf.append(entry.getValue());
+            if (sep.equals("")) {
+              sep = ";";
+            }
+          }
+          if (log.isDebugEnabled()) {
+            log.debug("Setting scan option: " + FieldIndexQueryReWriter.INDEXED_TERMS_LIST + " to " + buf.toString());
+          }
+          FieldIndexQueryReWriter rewriter = new FieldIndexQueryReWriter();
+          String q = "";
+          try {
+            q = queryString;
+            q = rewriter.applyCaseSensitivity(q, true, false);// Set upper/lower case for fieldname/fieldvalue
+            Map<String,String> opts = new HashMap<String,String>();
+            opts.put(FieldIndexQueryReWriter.INDEXED_TERMS_LIST, buf.toString());
+            q = rewriter.removeNonIndexedTermsAndInvalidRanges(q, opts);
+            q = rewriter.applyNormalizedTerms(q, opts);
+            if (log.isDebugEnabled()) {
+              log.debug("runServerQuery, FieldIndex Query: " + q);
+            }
+          } catch (org.apache.commons.jexl2.parser.ParseException ex) {
+            log.error("Could not parse query, Jexl ParseException: " + ex);
+          } catch (Exception ex) {
+            log.error("Problem rewriting query, Exception: " + ex.getMessage());
+          }
+          si.addOption(BooleanLogicIterator.FIELD_INDEX_QUERY, q);
+          
+          // Set the term cardinality option
+          sep = "";
+          buf.delete(0, buf.length());
+          for (Entry<String,Long> entry : termIndexInfo.getTermCardinality().entrySet()) {
+            buf.append(sep);
+            buf.append(entry.getKey());
+            buf.append(":");
+            buf.append(entry.getValue());
+            sep = ",";
+          }
+          if (log.isDebugEnabled())
+            log.debug("Setting scan option: " + BooleanLogicIterator.TERM_CARDINALITIES + " to " + buf.toString());
+          si.addOption(BooleanLogicIterator.TERM_CARDINALITIES, buf.toString());
+          if (this.useReadAheadIterator) {
+            if (log.isDebugEnabled()) {
+              log.debug("Enabling read ahead iterator with queue size: " + this.readAheadQueueSize + " and timeout: " + this.readAheadTimeOut);
+            }
+            si.addOption(ReadAheadIterator.QUEUE_SIZE, this.readAheadQueueSize);
+            si.addOption(ReadAheadIterator.TIMEOUT, this.readAheadTimeOut);
+            
+          }
+          
+          if (null != unevaluatedExpressions) {
+            StringBuilder unevaluatedExpressionList = new StringBuilder();
+            String sep2 = "";
+            for (String exp : unevaluatedExpressions) {
+              unevaluatedExpressionList.append(sep2).append(exp);
+              sep2 = ",";
+            }
+            if (log.isDebugEnabled())
+              log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString());
+            si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString());
+          }
+          
+          bs.addScanIterator(si);
+          
+          processResults.start();
+          processResults.suspend();
+          long count = 0;
+          for (Entry<Key,Value> entry : bs) {
+            count++;
+            // The key that is returned by the EvaluatingIterator is not the same key that is in
+            // the table. The value that is returned by the EvaluatingIterator is a kryo
+            // serialized EventFields object.
+            processResults.resume();
+            Document d = this.createDocument(entry.getKey(), entry.getValue());
+            results.getResults().add(d);
+            processResults.suspend();
+          }
+          log.info(count + " matching entries found in optimized query.");
+          optimizationSucceeded = true;
+          processResults.stop();
+        } catch (TableNotFoundException e) {
+          log.error(this.getTableName() + "not found", e);
+          throw new RuntimeException(this.getIndexTableName() + "not found", e);
+        } finally {
+          if (bs != null) {
+            bs.close();
+          }
+        }
+        optimizedEventQuery.stop();
+      }
+      optimizedQuery.stop();
+    }
+    
+    // WE should look into finding a better way to handle whether we do an optimized query or not.
+    // We are not setting up an else condition here because we may have aborted the logic early in the if statement.
+    if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()) && !orsAllIndexed)) {
+      // if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()))) {
+      fullScanQuery.start();
+      if (log.isDebugEnabled()) {
+        log.debug(hash + " Performing full scan query");
+      }
+      
+      // Set up a full scan using the date ranges from the query
+      // Create BatchScanner, set the ranges, and setup the iterators.
+      BatchScanner bs = null;
+      try {
+        // The ranges are the start and end dates
+        Collection<Range> r = getFullScanRange(beginDate, endDate, terms);
+        ranges.addAll(r);
+        
+        if (log.isDebugEnabled()) {
+          log.debug(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString());
+        }
+        
+        bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads);
+        bs.setRanges(ranges);
+        IteratorSetting si = new IteratorSetting(22, "eval", EvaluatingIterator.class);
+        // Create datatype regex if needed
+        if (null != typeFilter) {
+          StringBuilder buf = new StringBuilder();
+          String s = "";
+          for (String type : typeFilter) {
+            buf.append(s).append(type).append(".*");
+            s = "|";
+          }
+          if (log.isDebugEnabled())
+            log.debug("Setting colf regex iterator to: " + buf.toString());
+          IteratorSetting ri = new IteratorSetting(21, "typeFilter", RegExFilter.class);
+          RegExFilter.setRegexs(ri, null, buf.toString(), null, null, false);
+          bs.addScanIterator(ri);
+        }
+        if (log.isDebugEnabled()) {
+          log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString);
+        }
+        si.addOption(EvaluatingIterator.QUERY_OPTION, queryString);
+        if (null != unevaluatedExpressions) {
+          StringBuilder unevaluatedExpressionList = new StringBuilder();
+          String sep2 = "";
+          for (String exp : unevaluatedExpressions) {
+            unevaluatedExpressionList.append(sep2).append(exp);
+            sep2 = ",";
+          }
+          if (log.isDebugEnabled())
+            log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString());
+          si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString());
+        }
+        bs.addScanIterator(si);
+        long count = 0;
+        processResults.start();
+        processResults.suspend();
+        for (Entry<Key,Value> entry : bs) {
+          count++;
+          // The key that is returned by the EvaluatingIterator is not the same key that is in
+          // the partition table. The value that is returned by the EvaluatingIterator is a kryo
+          // serialized EventFields object.
+          processResults.resume();
+          Document d = this.createDocument(entry.getKey(), entry.getValue());
+          results.getResults().add(d);
+          processResults.suspend();
+        }
+        processResults.stop();
+        log.info(count + " matching entries found in full scan query.");
+      } catch (TableNotFoundException e) {
+        log.error(this.getTableName() + "not found", e);
+      } finally {
+        if (bs != null) {
+          bs.close();
+        }
+      }
+      fullScanQuery.stop();
+    }
+    
+    log.info("AbstractQueryLogic: " + queryString + " " + timeString(abstractQueryLogic.getTime()));
+    log.info("  1) parse query " + timeString(parseQuery.getTime()));
+    log.info("  2) query metadata " + timeString(queryMetadata.getTime()));
+    log.info("  3) full scan query " + timeString(fullScanQuery.getTime()));
+    log.info("  3) optimized query " + timeString(optimizedQuery.getTime()));
+    log.info("  1) process results " + timeString(processResults.getTime()));
+    log.info("      1) query global index " + timeString(queryGlobalIndex.getTime()));
+    log.info(hash + " Query completed.");
+    
+    return results;
+  }
+  
+  private static String timeString(long millis) {
+    return String.format("%4.2f", millis / 1000.);
+  }
+  
+}

Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/AbstractQueryLogic.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/ContentLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/ContentLogic.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/ContentLogic.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/ContentLogic.java Fri Jan  6 22:02:09 2012
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.logic;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.wikisearch.ingest.WikipediaMapper;
+import org.apache.accumulo.wikisearch.sample.Document;
+import org.apache.accumulo.wikisearch.sample.Field;
+import org.apache.accumulo.wikisearch.sample.Results;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This query table implementation returns a Results object that contains documents from the wiki table. The query will contain the partition id, wikitype, and
+ * UID so that we can seek directly to the document. The document is stored as base64 compressed binary in the Accumulo table. We will decompress the data so
+ * that it is base64 encoded binary data in the Results object.
+ * 
+ * The query that needs to be passed to the web service is: DOCUMENT:partitionId/wikitype/uid.
+ * 
+ */
+public class ContentLogic {
+  
+  private static final Logger log = Logger.getLogger(ContentLogic.class);
+  
+  private static final String NULL_BYTE = "\u0000";
+  
+  private String tableName = null;
+  
+  private Pattern queryPattern = Pattern.compile("^DOCUMENT:(.*)/(.*)/(.*)$");
+  
+  public String getTableName() {
+    return tableName;
+  }
+  
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+  
+  public Results runQuery(Connector connector, String query, List<String> authorizations) {
+    
+    Results results = new Results();
+    Authorizations auths = new Authorizations(StringUtils.join(authorizations, "|"));
+    
+    Matcher match = queryPattern.matcher(query);
+    if (!match.matches()) {
+      throw new IllegalArgumentException("Query does not match the pattern: DOCUMENT:partitionId/wikitype/uid, your query: " + query.toString());
+    } else {
+      String partitionId = match.group(1);
+      String wikitype = match.group(2);
+      String id = match.group(3);
+      
+      log.debug("Received pieces: " + partitionId + ", " + wikitype + ", " + id);
+      
+      // Create the Range
+      Key startKey = new Key(partitionId, WikipediaMapper.DOCUMENT_COLUMN_FAMILY, wikitype + NULL_BYTE + id);
+      Key endKey = new Key(partitionId, WikipediaMapper.DOCUMENT_COLUMN_FAMILY, wikitype + NULL_BYTE + id + NULL_BYTE);
+      Range r = new Range(startKey, true, endKey, false);
+      
+      log.debug("Setting range: " + r);
+      
+      try {
+        Scanner scanner = connector.createScanner(this.getTableName(), auths);
+        scanner.setRange(r);
+        // This should in theory only match one thing.
+        for (Entry<Key,Value> entry : scanner) {
+          Document doc = new Document();
+          doc.setId(id);
+          Field val = new Field();
+          val.setFieldName("DOCUMENT");
+          val.setFieldValue(new String(Base64.decodeBase64(entry.getValue().toString())));
+          doc.getFields().add(val);
+          results.getResults().add(doc);
+        }
+      } catch (TableNotFoundException e) {
+        throw new RuntimeException("Table not found: " + this.getTableName(), e);
+      }
+      
+    }
+    return results;
+  }
+  
+}

Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/ContentLogic.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/QueryLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/QueryLogic.java?rev=1228459&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/QueryLogic.java (added)
+++ incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/QueryLogic.java Fri Jan  6 22:02:09 2012
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.wikisearch.logic;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.wikisearch.iterator.EvaluatingIterator;
+import org.apache.accumulo.wikisearch.normalizer.LcNoDiacriticsNormalizer;
+import org.apache.accumulo.wikisearch.normalizer.Normalizer;
+import org.apache.accumulo.wikisearch.parser.QueryParser.QueryTerm;
+import org.apache.accumulo.wikisearch.parser.RangeCalculator;
+import org.apache.accumulo.wikisearch.protobuf.Uid;
+import org.apache.accumulo.wikisearch.util.TextUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Multimap;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * <pre>
+ * <h2>Overview</h2>
+ * QueryTable implementation that works with the JEXL grammar. This QueryTable
+ * uses the metadata, global index, and partitioned table to return
+ * results based on the query. Example queries:
+ * 
+ *  <b>Single Term Query</b>
+ *  'foo' - looks in global index for foo, and if any entries are found, then the query
+ *          is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed
+ *          down the optimized query path which uses the intersecting iterators on the shard
+ *          table.
+ * 
+ *  <b>Boolean expression</b>        
+ *  field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number),
+ *                   the query is parsed and the set of eventFields in the query that are indexed is determined by
+ *                   querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the
+ *                   eventFields that are indexed, the query may be sent down the optimized path or the full scan path.
+ * 
+ *  We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators:
+ * 
+ *  ==, !=, &gt;, &ge;, &lt;, &le;, =~, and !~
+ * 
+ *  Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction
+ *  with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An
+ *  example using this function is : "f:between(LATITUDE,60.0, 70.0)"
+ * 
+ *  <h2>Constraints on Query Structure</h2>
+ *  Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are
+ *  rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. Users
+ *  should also be aware that the literals used in the query need to match the data in the table. If an error occurs in the evaluation 
+ *  we are skipping the event.
+ * 
+ *  <h2>Notes on Optimization</h2>
+ *  Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table:
+ * 
+ *  1. An 'or' conjunction exists in the query but not all of the terms are indexed.
+ *  2. No indexed terms exist in the query
+ *  3. An unsupported operator exists in the query
+ * 
+ * </pre>
+ * 
+ */
+public class QueryLogic extends AbstractQueryLogic {
+  
+  protected static Logger log = Logger.getLogger(QueryLogic.class);
+  
+  private static String startPartition = "0";
+  
+  public QueryLogic() {
+    super();
+  }
+  
+  @Override
+  protected RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String,Normalizer> indexedTerms,
+      Multimap<String,QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> typeFilter)
+      throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException {
+    RangeCalculator calc = new RangeCalculator();
+    calc.execute(c, auths, indexedTerms, terms, queryString, this, typeFilter);
+    return calc;
+  }
+  
+  protected Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String,QueryTerm> terms) {
+    String startKey = startPartition;
+    String endKey = Integer.toString(this.getNumPartitions());
+    Range r = new Range(startKey, true, endKey, false);
+    return Collections.singletonList(r);
+  }
+  
+  @Override
+  protected IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> typeFilter) throws TableNotFoundException {
+    final String dummyTermName = "DUMMY";
+    UnionIndexRanges indexRanges = new UnionIndexRanges();
+    
+    // The entries in the index are normalized, since we don't have a field, just try using the LcNoDiacriticsNormalizer.
+    String normalizedFieldValue = new LcNoDiacriticsNormalizer().normalizeFieldValue("", value);
+    // Remove the begin and end ' marks
+    if (normalizedFieldValue.startsWith("'") && normalizedFieldValue.endsWith("'")) {
+      normalizedFieldValue = normalizedFieldValue.substring(1, normalizedFieldValue.length() - 1);
+    }
+    Text fieldValue = new Text(normalizedFieldValue);
+    if (log.isDebugEnabled()) {
+      log.debug("Querying index table : " + this.getIndexTableName() + " for normalized indexed term: " + fieldValue);
+    }
+    Scanner scanner = c.createScanner(this.getIndexTableName(), auths);
+    Range r = new Range(fieldValue);
+    scanner.setRange(r);
+    if (log.isDebugEnabled()) {
+      log.debug("Range for index query: " + r.toString());
+    }
+    for (Entry<Key,Value> entry : scanner) {
+      if (log.isDebugEnabled()) {
+        log.debug("Index entry: " + entry.getKey().toString());
+      }
+      // Get the shard id and datatype from the colq
+      String fieldName = entry.getKey().getColumnFamily().toString();
+      String colq = entry.getKey().getColumnQualifier().toString();
+      int separator = colq.indexOf(EvaluatingIterator.NULL_BYTE_STRING);
+      String shardId = null;
+      String datatype = null;
+      if (separator != -1) {
+        shardId = colq.substring(0, separator);
+        datatype = colq.substring(separator + 1);
+      } else {
+        shardId = colq;
+      }
+      // Skip this entry if the type is not correct
+      if (null != datatype && null != typeFilter && !typeFilter.contains(datatype))
+        continue;
+      // Parse the UID.List object from the value
+      Uid.List uidList = null;
+      try {
+        uidList = Uid.List.parseFrom(entry.getValue().get());
+      } catch (InvalidProtocolBufferException e) {
+        // Don't add UID information, at least we know what shards
+        // it is located in.
+      }
+      
+      // Add the count for this shard to the total count for the term.
+      long count = 0;
+      Long storedCount = indexRanges.getTermCardinality().get(dummyTermName);
+      if (null == storedCount) {
+        count = uidList.getCOUNT();
+      } else {
+        count = uidList.getCOUNT() + storedCount;
+      }
+      indexRanges.getTermCardinality().put(dummyTermName, count);
+      // Add the field name
+      indexRanges.getFieldNamesAndValues().put(fieldName, normalizedFieldValue);
+      
+      // Create the keys
+      Text shard = new Text(shardId);
+      if (uidList.getIGNORE()) {
+        // Then we create a scan range that is the entire shard
+        indexRanges.add(dummyTermName, new Range(shard));
+      } else {
+        // We should have UUIDs, create event ranges
+        for (String uuid : uidList.getUIDList()) {
+          Text cf = new Text(datatype);
+          TextUtil.textAppend(cf, uuid);
+          Key startKey = new Key(shard, cf);
+          Key endKey = new Key(shard, new Text(cf.toString() + EvaluatingIterator.NULL_BYTE_STRING));
+          Range eventRange = new Range(startKey, true, endKey, false);
+          indexRanges.add(dummyTermName, eventRange);
+        }
+      }
+    }
+    if (log.isDebugEnabled()) {
+      log.debug("Found " + indexRanges.getRanges().size() + " entries in the index for field value: " + normalizedFieldValue);
+    }
+    return indexRanges;
+    
+  }
+  
+}

Propchange: incubator/accumulo/branches/1.4/src/wikisearch/query/src/main/java/org/apache/accumulo/wikisearch/logic/QueryLogic.java
------------------------------------------------------------------------------
    svn:eol-style = native