You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2011/12/05 21:05:51 UTC
svn commit: r1210600 [10/16] - in
/incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/
ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/
ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/
ingest/src/main/java/protobuf/...
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java Mon Dec 5 20:05:49 2011
@@ -1,22 +1,21 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package logic;
-
import ingest.WikipediaMapper;
import iterator.BooleanLogicIterator;
import iterator.EvaluatingIterator;
@@ -77,830 +76,835 @@ import com.google.common.collect.Multima
* Query implementation that works with the JEXL grammar. This
* uses the metadata, global index, and partitioned table to return
* results based on the query. Example queries:
- *
+ *
* <b>Single Term Query</b>
* 'foo' - looks in global index for foo, and if any entries are found, then the query
* is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed
* down the optimized query path which uses the intersecting iterators on the partitioned
* table.
- *
+ *
* <b>Boolean expression</b>
* field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number),
* the query is parsed and the set of eventFields in the query that are indexed is determined by
* querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the
* eventFields that are indexed, the query may be sent down the optimized path or the full scan path.
- *
+ *
* We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators:
- *
+ *
* ==, !=, >, ≥, <, ≤, =~, and !~
- *
+ *
* Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction
* with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An
* example using this function is : "f:between(LATITUDE,60.0, 70.0)"
- *
+ *
* <h2>Constraints on Query Structure</h2>
* Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are
* rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. If
* an error occurs in the evaluation we are skipping the event.
- *
+ *
* <h2>Notes on Optimization</h2>
* Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table:
- *
+ *
* 1. An 'or' conjunction exists in the query but not all of the terms are indexed.
* 2. No indexed terms exist in the query
* 3. An unsupported operator exists in the query
- *
+ *
* </pre>
- *
+ *
*/
public abstract class AbstractQueryLogic {
-
- protected static Logger log = Logger.getLogger(AbstractQueryLogic.class);
+
+ protected static Logger log = Logger.getLogger(AbstractQueryLogic.class);
+
+ /**
+ * Set of datatypes to limit the query to.
+ */
+ public static final String DATATYPE_FILTER_SET = "datatype.filter.set";
+
+ private static class DoNotPerformOptimizedQueryException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * Object that is used to hold ranges found in the index. Subclasses may compute the final range set in various ways.
+ */
+ public static abstract class IndexRanges {
- /**
- * Set of datatypes to limit the query to.
- */
- public static final String DATATYPE_FILTER_SET = "datatype.filter.set";
-
- private static class DoNotPerformOptimizedQueryException extends Exception {
- private static final long serialVersionUID = 1L;
- }
-
- /**
- * Object that is used to hold ranges found in the index. Subclasses may compute
- * the final range set in various ways.
- */
- public static abstract class IndexRanges {
-
- private Map<String,String> indexValuesToOriginalValues = null;
- private Multimap<String,String> fieldNamesAndValues = HashMultimap.create();
- private Map<String, Long> termCardinality = new HashMap<String, Long>();
- protected Map<String, TreeSet<Range>> ranges = new HashMap<String, TreeSet<Range>>();
-
-
- public Multimap<String, String> getFieldNamesAndValues() {
- return fieldNamesAndValues;
- }
-
- public void setFieldNamesAndValues(Multimap<String, String> fieldNamesAndValues) {
- this.fieldNamesAndValues = fieldNamesAndValues;
- }
-
- public final Map<String, Long> getTermCardinality() {
- return termCardinality;
- }
-
- public Map<String, String> getIndexValuesToOriginalValues() {
- return indexValuesToOriginalValues;
- }
-
- public void setIndexValuesToOriginalValues(
- Map<String, String> indexValuesToOriginalValues) {
- this.indexValuesToOriginalValues = indexValuesToOriginalValues;
- }
-
- public abstract void add(String term, Range r);
-
- public abstract Set<Range> getRanges();
- }
-
- /**
- * Object that computes the ranges by unioning all of the ranges for all
- * of the terms together. In the case where ranges overlap, the largest range
- * is used.
- */
- public static class UnionIndexRanges extends IndexRanges {
-
- public static String DEFAULT_KEY = "default";
-
- public UnionIndexRanges() {
- this.ranges.put(DEFAULT_KEY, new TreeSet<Range>());
- }
-
- public Set<Range> getRanges() {
- //So the set of ranges is ordered. It *should* be the case that
- //ranges with partition ids will sort before ranges that point to
- //a specific event. Populate a new set of ranges but don't add a
- //range for an event where that range is contained in a range already
- //added.
- Set<Text> shardsAdded = new HashSet<Text>();
- Set<Range> returnSet = new HashSet<Range>();
- for (Range r : ranges.get(DEFAULT_KEY)) {
- if (!shardsAdded.contains(r.getStartKey().getRow())) {
- //Only add ranges with a start key for the entire partition.
- if (r.getStartKey().getColumnFamily() == null) {
- shardsAdded.add(r.getStartKey().getRow());
- }
- returnSet.add(r);
- } else {
- //if (log.isTraceEnabled())
- log.info("Skipping event specific range: " + r.toString() + " because range has already been added: " + shardsAdded.contains(r.getStartKey().getRow()));
- }
- }
- return returnSet;
- }
-
- public void add(String term, Range r) {
- ranges.get(DEFAULT_KEY).add(r);
- }
+ private Map<String,String> indexValuesToOriginalValues = null;
+ private Multimap<String,String> fieldNamesAndValues = HashMultimap.create();
+ private Map<String,Long> termCardinality = new HashMap<String,Long>();
+ protected Map<String,TreeSet<Range>> ranges = new HashMap<String,TreeSet<Range>>();
+
+ public Multimap<String,String> getFieldNamesAndValues() {
+ return fieldNamesAndValues;
}
-
- private String metadataTableName;
- private String indexTableName;
- private String reverseIndexTableName;
- private String tableName;
- private int queryThreads = 8;
- private String readAheadQueueSize;
- private String readAheadTimeOut;
- private boolean useReadAheadIterator;
- private Kryo kryo = new Kryo();
- private EventFields eventFields = new EventFields();
- private List<String> unevaluatedFields = null;
- private int numPartitions = 0;
- private Map<Class<? extends Normalizer>, Normalizer> normalizerCacheMap = new HashMap<Class<? extends Normalizer>, Normalizer>();
- private static final String NULL_BYTE = "\u0000";
-
- public AbstractQueryLogic() {
- super();
- EventFields.initializeKryo(kryo);
+
+ public void setFieldNamesAndValues(Multimap<String,String> fieldNamesAndValues) {
+ this.fieldNamesAndValues = fieldNamesAndValues;
}
-
- /**
- * Queries metadata table to determine which terms are indexed.
- * @param c
- * @param auths
- * @param queryLiterals
- * @param begin
- * @param end
- * @param datatypes - optional list of types
- * @return map of indexed field names to types to normalizers used in this date range
- * @throws TableNotFoundException
- * @throws IllegalAccessException
- * @throws InstantiationException
- */
- protected Map<String, Multimap<String, Class<? extends Normalizer>>> findIndexedTerms(Connector c, Authorizations auths, Set<String> queryLiterals, Set<String> datatypes) throws TableNotFoundException, InstantiationException, IllegalAccessException {
-
- Map<String, Multimap<String, Class<? extends Normalizer>>> results = new HashMap<String, Multimap<String, Class<? extends Normalizer>>>();
-
- for (String literal : queryLiterals) {
- if(log.isDebugEnabled())
- log.debug("Querying "+this.getMetadataTableName()+" table for " + literal);
- Range range = new Range(literal.toUpperCase());
- Scanner scanner = c.createScanner(this.getMetadataTableName(), auths);
- scanner.setRange(range);
- scanner.fetchColumnFamily(new Text(WikipediaMapper.METADATA_INDEX_COLUMN_FAMILY));
- for (Entry<Key, Value> entry : scanner) {
- if (!results.containsKey(literal)) {
- Multimap<String, Class<? extends Normalizer>> m = HashMultimap.create();
- results.put(literal, m);
- }
- //Get the column qualifier from the key. It contains the datatype and normalizer class
- String colq = entry.getKey().getColumnQualifier().toString();
- if (null != colq && colq.contains("\0")) {
- int idx = colq.indexOf("\0");
- if (idx != -1) {
- String type = colq.substring(0, idx);
- //If types are specified and this type is not in the list then skip it.
- if (null != datatypes && !datatypes.contains(type))
- continue;
- try {
- @SuppressWarnings("unchecked")
- Class<? extends Normalizer> clazz = (Class<? extends Normalizer>) Class.forName(colq.substring(idx+1));
- if (!normalizerCacheMap.containsKey(clazz))
- normalizerCacheMap.put(clazz, clazz.newInstance());
- results.get(literal).put(type, clazz);
- } catch (ClassNotFoundException e) {
- log.error("Unable to find normalizer on class path: " + colq.substring(idx+1), e);
- results.get(literal).put(type, LcNoDiacriticsNormalizer.class);
- }
- } else {
- log.warn("EventMetadata entry did not contain NULL byte: " + entry.getKey().toString());
- }
- } else {
- log.warn("ColumnQualifier null in EventMetadata for key: " + entry.getKey().toString());
- }
- }
- }
- if (log.isDebugEnabled())
- log.debug("METADATA RESULTS: " + results.toString());
- return results;
+
+ public final Map<String,Long> getTermCardinality() {
+ return termCardinality;
}
-
- /**
- * Performs a lookup in the global index for a single non-fielded term.
- *
- * @param c
- * @param auths
- * @param value
- * @param begin
- * @param end
- * @param datatypes - optional list of types
- * @return ranges that fit into the date range.
- */
- protected abstract IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> datatypes) throws TableNotFoundException;
-
-
- /**
- * Performs a lookup in the global index / reverse index and returns a RangeCalculator
- *
- * @param c Accumulo connection
- * @param auths authset for queries
- * @param indexedTerms multimap of indexed field name and Normalizers used
- * @param terms multimap of field name and QueryTerm object
- * @param begin query begin date
- * @param end query end date
- * @param dateFormatter
- * @param indexTableName
- * @param reverseIndexTableName
- * @param queryString original query string
- * @param queryThreads
- * @param datatypes - optional list of types
- * @return range calculator
- * @throws TableNotFoundException
- */
- protected abstract RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String, Normalizer> indexedTerms, Multimap<String, QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> datatypes) throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException;
- protected abstract Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String, QueryTerm> terms);
-
- public String getMetadataTableName() {
- return metadataTableName;
+ public Map<String,String> getIndexValuesToOriginalValues() {
+ return indexValuesToOriginalValues;
}
-
- public String getIndexTableName() {
- return indexTableName;
+
+ public void setIndexValuesToOriginalValues(Map<String,String> indexValuesToOriginalValues) {
+ this.indexValuesToOriginalValues = indexValuesToOriginalValues;
}
-
- public String getTableName() {
- return tableName;
+
+ public abstract void add(String term, Range r);
+
+ public abstract Set<Range> getRanges();
+ }
+
+ /**
+ * Object that computes the ranges by unioning all of the ranges for all of the terms together. In the case where ranges overlap, the largest range is used.
+ */
+ public static class UnionIndexRanges extends IndexRanges {
+
+ public static String DEFAULT_KEY = "default";
+
+ public UnionIndexRanges() {
+ this.ranges.put(DEFAULT_KEY, new TreeSet<Range>());
}
-
- public void setMetadataTableName(String metadataTableName) {
- this.metadataTableName = metadataTableName;
+
+ public Set<Range> getRanges() {
+ // So the set of ranges is ordered. It *should* be the case that
+ // ranges with partition ids will sort before ranges that point to
+ // a specific event. Populate a new set of ranges but don't add a
+ // range for an event where that range is contained in a range already
+ // added.
+ Set<Text> shardsAdded = new HashSet<Text>();
+ Set<Range> returnSet = new HashSet<Range>();
+ for (Range r : ranges.get(DEFAULT_KEY)) {
+ if (!shardsAdded.contains(r.getStartKey().getRow())) {
+ // Only add ranges with a start key for the entire partition.
+ if (r.getStartKey().getColumnFamily() == null) {
+ shardsAdded.add(r.getStartKey().getRow());
+ }
+ returnSet.add(r);
+ } else {
+ // if (log.isTraceEnabled())
+ log.info("Skipping event specific range: " + r.toString() + " because range has already been added: "
+ + shardsAdded.contains(r.getStartKey().getRow()));
+ }
+ }
+ return returnSet;
}
-
- public void setIndexTableName(String indexTableName) {
- this.indexTableName = indexTableName;
+
+ public void add(String term, Range r) {
+ ranges.get(DEFAULT_KEY).add(r);
}
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
+ }
+
+ private String metadataTableName;
+ private String indexTableName;
+ private String reverseIndexTableName;
+ private String tableName;
+ private int queryThreads = 8;
+ private String readAheadQueueSize;
+ private String readAheadTimeOut;
+ private boolean useReadAheadIterator;
+ private Kryo kryo = new Kryo();
+ private EventFields eventFields = new EventFields();
+ private List<String> unevaluatedFields = null;
+ private int numPartitions = 0;
+ private Map<Class<? extends Normalizer>,Normalizer> normalizerCacheMap = new HashMap<Class<? extends Normalizer>,Normalizer>();
+ private static final String NULL_BYTE = "\u0000";
+
+ public AbstractQueryLogic() {
+ super();
+ EventFields.initializeKryo(kryo);
+ }
+
+ /**
+ * Queries metadata table to determine which terms are indexed.
+ *
+ * @param c
+ * @param auths
+ * @param queryLiterals
+ * @param begin
+ * @param end
+ * @param datatypes
+ * - optional list of types
+ * @return map of indexed field names to types to normalizers used in this date range
+ * @throws TableNotFoundException
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ */
+ protected Map<String,Multimap<String,Class<? extends Normalizer>>> findIndexedTerms(Connector c, Authorizations auths, Set<String> queryLiterals,
+ Set<String> datatypes) throws TableNotFoundException, InstantiationException, IllegalAccessException {
+
+ Map<String,Multimap<String,Class<? extends Normalizer>>> results = new HashMap<String,Multimap<String,Class<? extends Normalizer>>>();
+
+ for (String literal : queryLiterals) {
+ if (log.isDebugEnabled())
+ log.debug("Querying " + this.getMetadataTableName() + " table for " + literal);
+ Range range = new Range(literal.toUpperCase());
+ Scanner scanner = c.createScanner(this.getMetadataTableName(), auths);
+ scanner.setRange(range);
+ scanner.fetchColumnFamily(new Text(WikipediaMapper.METADATA_INDEX_COLUMN_FAMILY));
+ for (Entry<Key,Value> entry : scanner) {
+ if (!results.containsKey(literal)) {
+ Multimap<String,Class<? extends Normalizer>> m = HashMultimap.create();
+ results.put(literal, m);
+ }
+ // Get the column qualifier from the key. It contains the datatype and normalizer class
+ String colq = entry.getKey().getColumnQualifier().toString();
+ if (null != colq && colq.contains("\0")) {
+ int idx = colq.indexOf("\0");
+ if (idx != -1) {
+ String type = colq.substring(0, idx);
+ // If types are specified and this type is not in the list then skip it.
+ if (null != datatypes && !datatypes.contains(type))
+ continue;
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends Normalizer> clazz = (Class<? extends Normalizer>) Class.forName(colq.substring(idx + 1));
+ if (!normalizerCacheMap.containsKey(clazz))
+ normalizerCacheMap.put(clazz, clazz.newInstance());
+ results.get(literal).put(type, clazz);
+ } catch (ClassNotFoundException e) {
+ log.error("Unable to find normalizer on class path: " + colq.substring(idx + 1), e);
+ results.get(literal).put(type, LcNoDiacriticsNormalizer.class);
+ }
+ } else {
+ log.warn("EventMetadata entry did not contain NULL byte: " + entry.getKey().toString());
+ }
+ } else {
+ log.warn("ColumnQualifier null in EventMetadata for key: " + entry.getKey().toString());
+ }
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("METADATA RESULTS: " + results.toString());
+ return results;
+ }
+
+ /**
+ * Performs a lookup in the global index for a single non-fielded term.
+ *
+ * @param c
+ * @param auths
+ * @param value
+ * @param begin
+ * @param end
+ * @param datatypes
+ * - optional list of types
+ * @return ranges that fit into the date range.
+ */
+ protected abstract IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> datatypes) throws TableNotFoundException;
+
+ /**
+ * Performs a lookup in the global index / reverse index and returns a RangeCalculator
+ *
+ * @param c
+ * Accumulo connection
+ * @param auths
+ * authset for queries
+ * @param indexedTerms
+ * multimap of indexed field name and Normalizers used
+ * @param terms
+ * multimap of field name and QueryTerm object
+ * @param begin
+ * query begin date
+ * @param end
+ * query end date
+ * @param dateFormatter
+ * @param indexTableName
+ * @param reverseIndexTableName
+ * @param queryString
+ * original query string
+ * @param queryThreads
+ * @param datatypes
+ * - optional list of types
+ * @return range calculator
+ * @throws TableNotFoundException
+ */
+ protected abstract RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String,Normalizer> indexedTerms,
+ Multimap<String,QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> datatypes)
+ throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException;
+
+ protected abstract Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String,QueryTerm> terms);
+
+ public String getMetadataTableName() {
+ return metadataTableName;
+ }
+
+ public String getIndexTableName() {
+ return indexTableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setMetadataTableName(String metadataTableName) {
+ this.metadataTableName = metadataTableName;
+ }
+
+ public void setIndexTableName(String indexTableName) {
+ this.indexTableName = indexTableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public int getQueryThreads() {
+ return queryThreads;
+ }
+
+ public void setQueryThreads(int queryThreads) {
+ this.queryThreads = queryThreads;
+ }
+
+ public String getReadAheadQueueSize() {
+ return readAheadQueueSize;
+ }
+
+ public String getReadAheadTimeOut() {
+ return readAheadTimeOut;
+ }
+
+ public boolean isUseReadAheadIterator() {
+ return useReadAheadIterator;
+ }
+
+ public void setReadAheadQueueSize(String readAheadQueueSize) {
+ this.readAheadQueueSize = readAheadQueueSize;
+ }
+
+ public void setReadAheadTimeOut(String readAheadTimeOut) {
+ this.readAheadTimeOut = readAheadTimeOut;
+ }
+
+ public void setUseReadAheadIterator(boolean useReadAheadIterator) {
+ this.useReadAheadIterator = useReadAheadIterator;
+ }
+
+ public String getReverseIndexTableName() {
+ return reverseIndexTableName;
+ }
+
+ public void setReverseIndexTableName(String reverseIndexTableName) {
+ this.reverseIndexTableName = reverseIndexTableName;
+ }
+
+ public List<String> getUnevaluatedFields() {
+ return unevaluatedFields;
+ }
+
+ public void setUnevaluatedFields(List<String> unevaluatedFields) {
+ this.unevaluatedFields = unevaluatedFields;
+ }
+
+ public void setUnevaluatedFields(String unevaluatedFieldList) {
+ this.unevaluatedFields = new ArrayList<String>();
+ for (String field : unevaluatedFieldList.split(","))
+ this.unevaluatedFields.add(field);
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ public void setNumPartitions(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ public Document createDocument(Key key, Value value) {
+ eventFields.clear();
+ ByteBuffer buf = ByteBuffer.wrap(value.get());
+ eventFields.readObjectData(kryo, buf);
+
+ Document doc = new Document();
+ // Set the id to the document id which is located in the colf
+ String row = key.getRow().toString();
+ String colf = key.getColumnFamily().toString();
+ int idx = colf.indexOf(NULL_BYTE);
+ String type = colf.substring(0, idx);
+ String id = colf.substring(idx + 1);
+ doc.setId(id);
+ for (Entry<String,Collection<FieldValue>> entry : eventFields.asMap().entrySet()) {
+ for (FieldValue fv : entry.getValue()) {
+ Field val = new Field();
+ val.setFieldName(entry.getKey());
+ val.setFieldValue(new String(fv.getValue(), Charset.forName("UTF-8")));
+ doc.getFields().add(val);
+ }
}
-
- public int getQueryThreads() {
- return queryThreads;
+
+ // Add the pointer for the content.
+ Field docPointer = new Field();
+ docPointer.setFieldName("DOCUMENT");
+ docPointer.setFieldValue("DOCUMENT:" + row + "/" + type + "/" + id);
+ doc.getFields().add(docPointer);
+
+ return doc;
+ }
+
+ public String getResultsKey(Entry<Key,Value> key) {
+ // Use the colf from the table, it contains the uuid and datatype
+ return key.getKey().getColumnFamily().toString();
+ }
+
+ public Results runQuery(Connector connector, List<String> authorizations, String query, Date beginDate, Date endDate, Set<String> types) {
+
+ if (StringUtils.isEmpty(query)) {
+ throw new IllegalArgumentException("NULL QueryNode reference passed to " + this.getClass().getSimpleName());
}
-
- public void setQueryThreads(int queryThreads) {
- this.queryThreads = queryThreads;
+
+ Set<Range> ranges = new HashSet<Range>();
+ Set<String> typeFilter = types;
+ Authorizations auths = new Authorizations(StringUtils.join(authorizations, "|"));
+ Results results = new Results();
+
+ // Get the query string
+ String queryString = query;
+
+ StopWatch abstractQueryLogic = new StopWatch();
+ StopWatch optimizedQuery = new StopWatch();
+ StopWatch queryGlobalIndex = new StopWatch();
+ StopWatch optimizedEventQuery = new StopWatch();
+ StopWatch fullScanQuery = new StopWatch();
+ StopWatch processResults = new StopWatch();
+
+ abstractQueryLogic.start();
+
+ StopWatch parseQuery = new StopWatch();
+ parseQuery.start();
+
+ QueryParser parser;
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("ShardQueryLogic calling QueryParser.execute");
+ }
+ parser = new QueryParser();
+ parser.execute(queryString);
+ } catch (org.apache.commons.jexl2.parser.ParseException e1) {
+ throw new IllegalArgumentException("Error parsing query", e1);
+ }
+ int hash = parser.getHashValue();
+ parseQuery.stop();
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " Query: " + queryString);
}
-
- public String getReadAheadQueueSize() {
- return readAheadQueueSize;
+
+ Set<String> fields = new HashSet<String>();
+ for (String f : parser.getQueryIdentifiers()) {
+ fields.add(f);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("getQueryIdentifiers: " + parser.getQueryIdentifiers().toString());
+ }
+ // Remove any negated fields from the fields list, we don't want to lookup negated fields
+ // in the index.
+ fields.removeAll(parser.getNegatedTermsForOptimizer());
+
+ if (log.isDebugEnabled()) {
+ log.debug("getQueryIdentifiers: " + parser.getQueryIdentifiers().toString());
}
-
- public String getReadAheadTimeOut() {
- return readAheadTimeOut;
+ // Get the mapping of field name to QueryTerm object from the query. The query term object
+ // contains the operator, whether its negated or not, and the literal to test against.
+ Multimap<String,QueryTerm> terms = parser.getQueryTerms();
+
+ // Find out which terms are indexed
+ // TODO: Should we cache indexed terms or does that not make sense since we are always
+ // loading data.
+ StopWatch queryMetadata = new StopWatch();
+ queryMetadata.start();
+ Map<String,Multimap<String,Class<? extends Normalizer>>> metadataResults;
+ try {
+ metadataResults = findIndexedTerms(connector, auths, fields, typeFilter);
+ } catch (Exception e1) {
+ throw new RuntimeException("Error in metadata lookup", e1);
}
-
- public boolean isUseReadAheadIterator() {
- return useReadAheadIterator;
+
+ // Create a map of indexed term to set of normalizers for it
+ Multimap<String,Normalizer> indexedTerms = HashMultimap.create();
+ for (Entry<String,Multimap<String,Class<? extends Normalizer>>> entry : metadataResults.entrySet()) {
+ // Get the normalizer from the normalizer cache
+ for (Class<? extends Normalizer> clazz : entry.getValue().values()) {
+ indexedTerms.put(entry.getKey(), normalizerCacheMap.get(clazz));
+ }
+ }
+ queryMetadata.stop();
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " Indexed Terms: " + indexedTerms.toString());
}
-
- public void setReadAheadQueueSize(String readAheadQueueSize) {
- this.readAheadQueueSize = readAheadQueueSize;
+
+ Set<String> orTerms = parser.getOrTermsForOptimizer();
+
+ // Iterate over the query terms to get the operators specified in the query.
+ ArrayList<String> unevaluatedExpressions = new ArrayList<String>();
+ boolean unsupportedOperatorSpecified = false;
+ for (Entry<String,QueryTerm> entry : terms.entries()) {
+ if (null == entry.getValue()) {
+ continue;
+ }
+
+ if (null != this.unevaluatedFields && this.unevaluatedFields.contains(entry.getKey().trim())) {
+ unevaluatedExpressions.add(entry.getKey().trim() + " " + entry.getValue().getOperator() + " " + entry.getValue().getValue());
+ }
+
+ int operator = JexlOperatorConstants.getJJTNodeType(entry.getValue().getOperator());
+ if (!(operator == ParserTreeConstants.JJTEQNODE || operator == ParserTreeConstants.JJTNENODE || operator == ParserTreeConstants.JJTLENODE
+ || operator == ParserTreeConstants.JJTLTNODE || operator == ParserTreeConstants.JJTGENODE || operator == ParserTreeConstants.JJTGTNODE || operator == ParserTreeConstants.JJTERNODE)) {
+ unsupportedOperatorSpecified = true;
+ break;
+ }
+ }
+ if (null != unevaluatedExpressions)
+ unevaluatedExpressions.trimToSize();
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " unsupportedOperators: " + unsupportedOperatorSpecified + " indexedTerms: " + indexedTerms.toString() + " orTerms: "
+ + orTerms.toString() + " unevaluatedExpressions: " + unevaluatedExpressions.toString());
}
-
- public void setReadAheadTimeOut(String readAheadTimeOut) {
- this.readAheadTimeOut = readAheadTimeOut;
+
+ // We can use the intersecting iterator over the field index as an optimization under the
+ // following conditions
+ //
+ // 1. No unsupported operators in the query.
+ // 2. No 'or' operators and at least one term indexed
+ // or
+ // 1. No unsupported operators in the query.
+ // 2. and all terms indexed
+ // or
+ // 1. All or'd terms are indexed. NOTE, this will potentially skip some queries and push to a full table scan
+ // // WE should look into finding a better way to handle whether we do an optimized query or not.
+ boolean optimizationSucceeded = false;
+ boolean orsAllIndexed = false;
+ if (orTerms.isEmpty()) {
+ orsAllIndexed = false;
+ } else {
+ orsAllIndexed = indexedTerms.keySet().containsAll(orTerms);
}
-
- public void setUseReadAheadIterator(boolean useReadAheadIterator) {
- this.useReadAheadIterator = useReadAheadIterator;
+
+ if (log.isDebugEnabled()) {
+ log.debug("All or terms are indexed");
}
-
- public String getReverseIndexTableName() {
- return reverseIndexTableName;
- }
-
- public void setReverseIndexTableName(String reverseIndexTableName) {
- this.reverseIndexTableName = reverseIndexTableName;
- }
-
- public List<String> getUnevaluatedFields() {
- return unevaluatedFields;
- }
-
- public void setUnevaluatedFields(List<String> unevaluatedFields) {
- this.unevaluatedFields = unevaluatedFields;
- }
-
- public void setUnevaluatedFields(String unevaluatedFieldList) {
- this.unevaluatedFields = new ArrayList<String>();
- for (String field : unevaluatedFieldList.split(","))
- this.unevaluatedFields.add(field);
- }
-
- public int getNumPartitions() {
- return numPartitions;
- }
-
- public void setNumPartitions(int numPartitions) {
- this.numPartitions = numPartitions;
- }
-
- public Document createDocument(Key key, Value value) {
- eventFields.clear();
- ByteBuffer buf = ByteBuffer.wrap(value.get());
- eventFields.readObjectData(kryo, buf);
-
- Document doc = new Document();
- //Set the id to the document id which is located in the colf
- String row = key.getRow().toString();
- String colf = key.getColumnFamily().toString();
- int idx = colf.indexOf(NULL_BYTE);
- String type = colf.substring(0, idx);
- String id = colf.substring(idx+1);
- doc.setId(id);
- for (Entry<String, Collection<FieldValue>> entry : eventFields.asMap().entrySet()) {
- for (FieldValue fv : entry.getValue()) {
- Field val = new Field();
- val.setFieldName(entry.getKey());
- val.setFieldValue(new String(fv.getValue(), Charset.forName("UTF-8")));
- doc.getFields().add(val);
+
+ if (!unsupportedOperatorSpecified
+ && (((null == orTerms || orTerms.isEmpty()) && indexedTerms.size() > 0) || (fields.size() > 0 && indexedTerms.size() == fields.size()) || orsAllIndexed)) {
+ optimizedQuery.start();
+ // Set up intersecting iterator over field index.
+
+ // Get information from the global index for the indexed terms. The results object will contain the term
+ // mapped to an object that contains the total count, and partitions where this term is located.
+
+ // TODO: Should we cache indexed term information or does that not make sense since we are always loading data
+ queryGlobalIndex.start();
+ IndexRanges termIndexInfo;
+ Set<String> indexedColumns;
+ try {
+ // If fields is null or zero, then it's probably the case that the user entered a value
+ // to search for with no fields. Check for the value in index.
+ if (fields.isEmpty()) {
+ termIndexInfo = this.getTermIndexInformation(connector, auths, queryString, typeFilter);
+ if (null != termIndexInfo && termIndexInfo.getRanges().isEmpty()) {
+ // Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards
+ // in unhandled locations.
+ // Break out of here by throwing a named exception and do full scan
+ throw new DoNotPerformOptimizedQueryException();
+ }
+ // We need to rewrite the query string here so that it's valid.
+ if (termIndexInfo instanceof UnionIndexRanges) {
+ UnionIndexRanges union = (UnionIndexRanges) termIndexInfo;
+ StringBuilder buf = new StringBuilder();
+ String sep = "";
+ for (String fieldName : union.getFieldNamesAndValues().keySet()) {
+ buf.append(sep).append(fieldName).append(" == ");
+ if (!(queryString.startsWith("'") && queryString.endsWith("'"))) {
+ buf.append("'").append(queryString).append("'");
+ } else {
+ buf.append(queryString);
+ }
+ sep = " or ";
}
- }
-
- //Add the pointer for the content.
- Field docPointer = new Field();
- docPointer.setFieldName("DOCUMENT");
- docPointer.setFieldValue("DOCUMENT:"+row+"/"+type+"/"+id);
- doc.getFields().add(docPointer);
-
- return doc;
- }
-
- public String getResultsKey(Entry<Key, Value> key) {
- //Use the colf from the table, it contains the uuid and datatype
- return key.getKey().getColumnFamily().toString();
- }
-
- public Results runQuery(Connector connector, List<String> authorizations, String query, Date beginDate, Date endDate, Set<String> types) {
-
- if (StringUtils.isEmpty(query)) {
- throw new IllegalArgumentException("NULL QueryNode reference passed to " + this.getClass().getSimpleName());
- }
-
- Set<Range> ranges = new HashSet<Range>();
- Set<String> typeFilter = types;
- Authorizations auths = new Authorizations(StringUtils.join(authorizations, "|"));
- Results results = new Results();
-
- //Get the query string
- String queryString = query;
-
-
- StopWatch abstractQueryLogic = new StopWatch();
- StopWatch optimizedQuery = new StopWatch();
- StopWatch queryGlobalIndex = new StopWatch();
- StopWatch optimizedEventQuery = new StopWatch();
- StopWatch fullScanQuery = new StopWatch();
- StopWatch processResults = new StopWatch();
-
- abstractQueryLogic.start();
-
- StopWatch parseQuery = new StopWatch();
- parseQuery.start();
-
- QueryParser parser;
- try {
- if(log.isDebugEnabled()){
- log.debug("ShardQueryLogic calling QueryParser.execute");
+ if (log.isDebugEnabled()) {
+ log.debug("Rewrote query for non-fielded single term query: " + queryString + " to " + buf.toString());
}
- parser = new QueryParser();
- parser.execute(queryString);
- } catch (org.apache.commons.jexl2.parser.ParseException e1) {
- throw new IllegalArgumentException("Error parsing query", e1);
- }
- int hash = parser.getHashValue();
- parseQuery.stop();
- if (log.isDebugEnabled()) {
- log.debug(hash + " Query: " + queryString);
- }
-
- Set<String> fields = new HashSet<String>();
- for(String f : parser.getQueryIdentifiers()){
- fields.add(f);
- }
- if(log.isDebugEnabled()){
- log.debug("getQueryIdentifiers: "+parser.getQueryIdentifiers().toString());
- }
- //Remove any negated fields from the fields list, we don't want to lookup negated fields
- //in the index.
- fields.removeAll(parser.getNegatedTermsForOptimizer());
-
- if(log.isDebugEnabled()){
- log.debug("getQueryIdentifiers: "+parser.getQueryIdentifiers().toString());
- }
- //Get the mapping of field name to QueryTerm object from the query. The query term object
- //contains the operator, whether its negated or not, and the literal to test against.
- Multimap<String, QueryTerm> terms = parser.getQueryTerms();
-
- //Find out which terms are indexed
- //TODO: Should we cache indexed terms or does that not make sense since we are always
- //loading data.
- StopWatch queryMetadata = new StopWatch();
- queryMetadata.start();
- Map<String, Multimap<String, Class<? extends Normalizer>>> metadataResults;
- try {
- metadataResults = findIndexedTerms(connector, auths, fields, typeFilter);
- } catch (Exception e1) {
- throw new RuntimeException("Error in metadata lookup", e1);
- }
+ queryString = buf.toString();
+ // We also need to set the set of indexed terms since we found these in the index.
+ indexedColumns = union.getFieldNamesAndValues().keySet();
+ } else {
+ throw new RuntimeException("Unexpected IndexRanges implementation");
+ }
+ } else {
+ RangeCalculator calc = this.getTermIndexInformation(connector, auths, indexedTerms, terms, this.getIndexTableName(), this.getReverseIndexTableName(),
+ queryString, this.queryThreads, typeFilter);
+ if (null == calc.getResult() || calc.getResult().isEmpty()) {
+ // Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards
+ // in unhandled locations.
+ // Break out of here by throwing a named exception and do full scan
+ throw new DoNotPerformOptimizedQueryException();
+ }
+ termIndexInfo = new UnionIndexRanges();
+ termIndexInfo.setIndexValuesToOriginalValues(calc.getIndexValues());
+ termIndexInfo.setFieldNamesAndValues(calc.getIndexEntries());
+ termIndexInfo.getTermCardinality().putAll(calc.getTermCardinalities());
+ for (Range r : calc.getResult()) {
+ // foo is a placeholder and is ignored.
+ termIndexInfo.add("foo", r);
+ }
+ indexedColumns = termIndexInfo.getFieldNamesAndValues().keySet();
+ }
+ } catch (TableNotFoundException e) {
+ log.error(this.getIndexTableName() + "not found", e);
+ throw new RuntimeException(this.getIndexTableName() + "not found", e);
+ } catch (org.apache.commons.jexl2.parser.ParseException e) {
+ throw new RuntimeException("Error determining ranges for query: " + queryString, e);
+ } catch (DoNotPerformOptimizedQueryException e) {
+ log.info("Indexed fields not found in index, performing full scan");
+ termIndexInfo = null;
+ }
+ queryGlobalIndex.stop();
+
+ // Determine if we should proceed with optimized query based on results from the global index
+ boolean proceed = false;
+ if (null == termIndexInfo || termIndexInfo.getFieldNamesAndValues().values().size() == 0) {
+ proceed = false;
+ } else if (null != orTerms && orTerms.size() > 0 && (termIndexInfo.getFieldNamesAndValues().values().size() == indexedTerms.size())) {
+ proceed = true;
+ } else if (termIndexInfo.getFieldNamesAndValues().values().size() > 0) {
+ proceed = true;
+ } else if (orsAllIndexed) {
+ proceed = true;
+ } else {
+ proceed = false;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Proceed with optimized query: " + proceed);
+ if (null != termIndexInfo)
+ log.debug("termIndexInfo.getTermsFound().size(): " + termIndexInfo.getFieldNamesAndValues().values().size() + " indexedTerms.size: "
+ + indexedTerms.size() + " fields.size: " + fields.size());
+ }
+ if (proceed) {
- //Create a map of indexed term to set of normalizers for it
- Multimap<String, Normalizer> indexedTerms = HashMultimap.create();
- for (Entry<String, Multimap<String, Class<? extends Normalizer>>> entry : metadataResults.entrySet()) {
- //Get the normalizer from the normalizer cache
- for (Class<? extends Normalizer> clazz : entry.getValue().values()) {
- indexedTerms.put(entry.getKey(), normalizerCacheMap.get(clazz));
- }
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " Performing optimized query");
}
- queryMetadata.stop();
+ // Use the scan ranges from the GlobalIndexRanges object as the ranges for the batch scanner
+ ranges = termIndexInfo.getRanges();
if (log.isDebugEnabled()) {
- log.debug(hash + " Indexed Terms: " + indexedTerms.toString());
+ log.info(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString());
}
-
- Set<String> orTerms = parser.getOrTermsForOptimizer();
-
- //Iterate over the query terms to get the operators specified in the query.
- ArrayList<String> unevaluatedExpressions = new ArrayList<String>();
- boolean unsupportedOperatorSpecified = false;
- for (Entry<String,QueryTerm> entry : terms.entries()) {
- if (null == entry.getValue()) {
- continue;
+
+ // Create BatchScanner, set the ranges, and setup the iterators.
+ optimizedEventQuery.start();
+ BatchScanner bs = null;
+ try {
+ bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads);
+ bs.setRanges(ranges);
+ IteratorSetting si = new IteratorSetting(21, "eval", OptimizedQueryIterator.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString);
+ }
+ // Set the query option
+ si.addOption(EvaluatingIterator.QUERY_OPTION, queryString);
+ // Set the Indexed Terms List option. This is the field name and normalized field value pair separated
+ // by a comma.
+ StringBuilder buf = new StringBuilder();
+ String sep = "";
+ for (Entry<String,String> entry : termIndexInfo.getFieldNamesAndValues().entries()) {
+ buf.append(sep);
+ buf.append(entry.getKey());
+ buf.append(":");
+ buf.append(termIndexInfo.getIndexValuesToOriginalValues().get(entry.getValue()));
+ buf.append(":");
+ buf.append(entry.getValue());
+ if (sep.equals("")) {
+ sep = ";";
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Setting scan option: " + FieldIndexQueryReWriter.INDEXED_TERMS_LIST + " to " + buf.toString());
+ }
+ FieldIndexQueryReWriter rewriter = new FieldIndexQueryReWriter();
+ String q = "";
+ try {
+ q = queryString;
+ q = rewriter.applyCaseSensitivity(q, true, false);// Set upper/lower case for fieldname/fieldvalue
+ Map<String,String> opts = new HashMap<String,String>();
+ opts.put(FieldIndexQueryReWriter.INDEXED_TERMS_LIST, buf.toString());
+ q = rewriter.removeNonIndexedTermsAndInvalidRanges(q, opts);
+ q = rewriter.applyNormalizedTerms(q, opts);
+ if (log.isDebugEnabled()) {
+ log.debug("runServerQuery, FieldIndex Query: " + q);
}
-
- if (null != this.unevaluatedFields && this.unevaluatedFields.contains(entry.getKey().trim())) {
- unevaluatedExpressions.add(entry.getKey().trim() + " " + entry.getValue().getOperator() + " " + entry.getValue().getValue());
+ } catch (org.apache.commons.jexl2.parser.ParseException ex) {
+ log.error("Could not parse query, Jexl ParseException: " + ex);
+ } catch (Exception ex) {
+ log.error("Problem rewriting query, Exception: " + ex.getMessage());
+ }
+ si.addOption(BooleanLogicIterator.FIELD_INDEX_QUERY, q);
+
+ // Set the term cardinality option
+ sep = "";
+ buf.delete(0, buf.length());
+ for (Entry<String,Long> entry : termIndexInfo.getTermCardinality().entrySet()) {
+ buf.append(sep);
+ buf.append(entry.getKey());
+ buf.append(":");
+ buf.append(entry.getValue());
+ sep = ",";
+ }
+ if (log.isDebugEnabled())
+ log.debug("Setting scan option: " + BooleanLogicIterator.TERM_CARDINALITIES + " to " + buf.toString());
+ si.addOption(BooleanLogicIterator.TERM_CARDINALITIES, buf.toString());
+ if (this.useReadAheadIterator) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enabling read ahead iterator with queue size: " + this.readAheadQueueSize + " and timeout: " + this.readAheadTimeOut);
}
+ si.addOption(ReadAheadIterator.QUEUE_SIZE, this.readAheadQueueSize);
+ si.addOption(ReadAheadIterator.TIMEOUT, this.readAheadTimeOut);
- int operator = JexlOperatorConstants.getJJTNodeType(entry.getValue().getOperator());
- if (!(operator == ParserTreeConstants.JJTEQNODE
- || operator == ParserTreeConstants.JJTNENODE
- || operator == ParserTreeConstants.JJTLENODE
- || operator == ParserTreeConstants.JJTLTNODE
- || operator == ParserTreeConstants.JJTGENODE
- || operator == ParserTreeConstants.JJTGTNODE
- || operator == ParserTreeConstants.JJTERNODE)) {
- unsupportedOperatorSpecified = true;
- break;
+ }
+
+ if (null != unevaluatedExpressions) {
+ StringBuilder unevaluatedExpressionList = new StringBuilder();
+ String sep2 = "";
+ for (String exp : unevaluatedExpressions) {
+ unevaluatedExpressionList.append(sep2).append(exp);
+ sep2 = ",";
+ }
+ if (log.isDebugEnabled())
+ log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString());
+ si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString());
+ }
+
+ bs.addScanIterator(si);
+
+ processResults.start();
+ processResults.suspend();
+ long count = 0;
+ for (Entry<Key,Value> entry : bs) {
+ count++;
+ if (log.isDebugEnabled()) {
+ log.debug("Key: " + entry.getKey());// + "\nValue: " + entry.getValue() + "\n");
}
- }
- if (null != unevaluatedExpressions)
- unevaluatedExpressions.trimToSize();
+ // The key that is returned by the EvaluatingIterator is not the same key that is in
+ // the table. The value that is returned by the EvaluatingIterator is a kryo
+ // serialized EventFields object.
+ processResults.resume();
+ Document d = this.createDocument(entry.getKey(), entry.getValue());
+ results.getResults().add(d);
+ processResults.suspend();
+ }
+ log.info(count + " matching entries found in optimized query.");
+ optimizationSucceeded = true;
+ processResults.stop();
+ } catch (TableNotFoundException e) {
+ log.error(this.getTableName() + "not found", e);
+ throw new RuntimeException(this.getIndexTableName() + "not found", e);
+ } finally {
+ if (bs != null) {
+ bs.close();
+ }
+ }
+ optimizedEventQuery.stop();
+ }
+ optimizedQuery.stop();
+ }
+
+ // WE should look into finding a better way to handle whether we do an optimized query or not.
+ // We are not setting up an else condition here because we may have aborted the logic early in the if statement.
+ if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()) && !orsAllIndexed)) {
+ // if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()))) {
+ fullScanQuery.start();
+ if (log.isDebugEnabled()) {
+ log.debug(hash + " Performing full scan query");
+ }
+
+ // Set up a full scan using the date ranges from the query
+ // Create BatchScanner, set the ranges, and setup the iterators.
+ BatchScanner bs = null;
+ try {
+ // The ranges are the start and end dates
+ Collection<Range> r = getFullScanRange(beginDate, endDate, terms);
+ ranges.addAll(r);
+
if (log.isDebugEnabled()) {
- log.debug(hash + " unsupportedOperators: " + unsupportedOperatorSpecified
- + " indexedTerms: " + indexedTerms.toString() + " orTerms: " + orTerms.toString() +
- " unevaluatedExpressions: " + unevaluatedExpressions.toString());
- }
-
- //We can use the intersecting iterator over the field index as an optimization under the
- //following conditions
- //
- // 1. No unsupported operators in the query.
- // 2. No 'or' operators and at least one term indexed
- // or
- // 1. No unsupported operators in the query.
- // 2. and all terms indexed
- // or
- // 1. All or'd terms are indexed. NOTE, this will potentially skip some queries and push to a full table scan
- //// WE should look into finding a better way to handle whether we do an optimized query or not.
- boolean optimizationSucceeded = false;
- boolean orsAllIndexed = false;
- if(orTerms.isEmpty()){
- orsAllIndexed = false;
- }else{
- orsAllIndexed = indexedTerms.keySet().containsAll(orTerms);
+ log.debug(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString());
}
-
- if(log.isDebugEnabled()){
- log.debug("All or terms are indexed");
+
+ bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads);
+ bs.setRanges(ranges);
+ IteratorSetting si = new IteratorSetting(22, "eval", EvaluatingIterator.class);
+ // Create datatype regex if needed
+ if (null != typeFilter) {
+ StringBuilder buf = new StringBuilder();
+ String s = "";
+ for (String type : typeFilter) {
+ buf.append(s).append(type).append(".*");
+ s = "|";
+ }
+ if (log.isDebugEnabled())
+ log.debug("Setting colf regex iterator to: " + buf.toString());
+ IteratorSetting ri = new IteratorSetting(21, "typeFilter", RegExIterator.class);
+ ri.addOption(RegExFilter.COLF_REGEX, buf.toString());
+ bs.addScanIterator(ri);
}
-
- if (!unsupportedOperatorSpecified && (((null == orTerms || orTerms.isEmpty()) && indexedTerms.size() > 0) || (fields.size() > 0 && indexedTerms.size() == fields.size()) ||orsAllIndexed ) ) {
- optimizedQuery.start();
- //Set up intersecting iterator over field index.
-
- //Get information from the global index for the indexed terms. The results object will contain the term
- //mapped to an object that contains the total count, and partitions where this term is located.
-
- //TODO: Should we cache indexed term information or does that not make sense since we are always loading data
- queryGlobalIndex.start();
- IndexRanges termIndexInfo;
- Set<String> indexedColumns;
- try {
- //If fields is null or zero, then it's probably the case that the user entered a value
- //to search for with no fields. Check for the value in index.
- if (fields.isEmpty()) {
- termIndexInfo = this.getTermIndexInformation(connector, auths, queryString, typeFilter);
- if (null != termIndexInfo && termIndexInfo.getRanges().isEmpty()) {
- //Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards
- //in unhandled locations.
- //Break out of here by throwing a named exception and do full scan
- throw new DoNotPerformOptimizedQueryException();
- }
- //We need to rewrite the query string here so that it's valid.
- if (termIndexInfo instanceof UnionIndexRanges) {
- UnionIndexRanges union = (UnionIndexRanges) termIndexInfo;
- StringBuilder buf = new StringBuilder();
- String sep = "";
- for (String fieldName : union.getFieldNamesAndValues().keySet()) {
- buf.append(sep).append(fieldName).append(" == ");
- if (!(queryString.startsWith("'") && queryString.endsWith("'"))) {
- buf.append("'").append(queryString).append("'");
- } else {
- buf.append(queryString);
- }
- sep = " or ";
- }
- if (log.isDebugEnabled()) {
- log.debug("Rewrote query for non-fielded single term query: " + queryString + " to " + buf.toString());
- }
- queryString = buf.toString();
- //We also need to set the set of indexed terms since we found these in the index.
- indexedColumns = union.getFieldNamesAndValues().keySet();
- } else {
- throw new RuntimeException("Unexpected IndexRanges implementation");
- }
- } else {
- RangeCalculator calc = this.getTermIndexInformation(connector, auths, indexedTerms, terms, this.getIndexTableName(), this.getReverseIndexTableName(), queryString, this.queryThreads, typeFilter);
- if (null == calc.getResult() || calc.getResult().isEmpty()) {
- //Then we didn't find anything in the index for this query. This may happen for an indexed term that has wildcards
- //in unhandled locations.
- //Break out of here by throwing a named exception and do full scan
- throw new DoNotPerformOptimizedQueryException();
- }
- termIndexInfo = new UnionIndexRanges();
- termIndexInfo.setIndexValuesToOriginalValues(calc.getIndexValues());
- termIndexInfo.setFieldNamesAndValues(calc.getIndexEntries());
- termIndexInfo.getTermCardinality().putAll(calc.getTermCardinalities());
- for (Range r : calc.getResult()) {
- //foo is a placeholder and is ignored.
- termIndexInfo.add("foo", r);
- }
- indexedColumns = termIndexInfo.getFieldNamesAndValues().keySet();
- }
- } catch (TableNotFoundException e) {
- log.error(this.getIndexTableName() + "not found", e);
- throw new RuntimeException(this.getIndexTableName() + "not found", e);
- } catch (org.apache.commons.jexl2.parser.ParseException e) {
- throw new RuntimeException("Error determining ranges for query: " + queryString, e);
- } catch (DoNotPerformOptimizedQueryException e) {
- log.info("Indexed fields not found in index, performing full scan");
- termIndexInfo = null;
- }
- queryGlobalIndex.stop();
-
- //Determine if we should proceed with optimized query based on results from the global index
- boolean proceed = false;
- if (null == termIndexInfo || termIndexInfo.getFieldNamesAndValues().values().size() == 0) {
- proceed = false;
- } else if (null != orTerms && orTerms.size() > 0 && (termIndexInfo.getFieldNamesAndValues().values().size() == indexedTerms.size())) {
- proceed = true;
- } else if (termIndexInfo.getFieldNamesAndValues().values().size() > 0) {
- proceed = true;
- }else if(orsAllIndexed){
- proceed = true;
- } else {
- proceed = false;
- }
- if (log.isDebugEnabled()) {
- log.debug("Proceed with optimized query: " + proceed);
- if (null != termIndexInfo)
- log.debug("termIndexInfo.getTermsFound().size(): " + termIndexInfo.getFieldNamesAndValues().values().size() + " indexedTerms.size: " + indexedTerms.size() + " fields.size: " + fields.size());
- }
- if (proceed) {
-
- if (log.isDebugEnabled()) {
- log.debug(hash + " Performing optimized query");
- }
- //Use the scan ranges from the GlobalIndexRanges object as the ranges for the batch scanner
- ranges = termIndexInfo.getRanges();
- if (log.isDebugEnabled()) {
- log.info(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString());
- }
-
- //Create BatchScanner, set the ranges, and setup the iterators.
- optimizedEventQuery.start();
- BatchScanner bs = null;
- try {
- bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads);
- bs.setRanges(ranges);
- IteratorSetting si = new IteratorSetting(21, "eval", OptimizedQueryIterator.class);
-
- if (log.isDebugEnabled()) {
- log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString);
- }
- //Set the query option
- si.addOption(EvaluatingIterator.QUERY_OPTION, queryString);
- //Set the Indexed Terms List option. This is the field name and normalized field value pair separated
- //by a comma.
- StringBuilder buf = new StringBuilder();
- String sep = "";
- for (Entry<String, String> entry : termIndexInfo.getFieldNamesAndValues().entries()) {
- buf.append(sep);
- buf.append(entry.getKey());
- buf.append(":");
- buf.append(termIndexInfo.getIndexValuesToOriginalValues().get(entry.getValue()));
- buf.append(":");
- buf.append(entry.getValue());
- if (sep.equals("")) {
- sep = ";";
- }
- }
- if (log.isDebugEnabled()) {
- log.debug("Setting scan option: " + FieldIndexQueryReWriter.INDEXED_TERMS_LIST + " to " + buf.toString());
- }
- FieldIndexQueryReWriter rewriter = new FieldIndexQueryReWriter();
- String q="";
- try {
- q=queryString;
- q = rewriter.applyCaseSensitivity(q, true, false);//Set upper/lower case for fieldname/fieldvalue
- Map<String,String> opts = new HashMap<String,String>();
- opts.put(FieldIndexQueryReWriter.INDEXED_TERMS_LIST, buf.toString());
- q = rewriter.removeNonIndexedTermsAndInvalidRanges(q, opts);
- q = rewriter.applyNormalizedTerms(q, opts);
- if(log.isDebugEnabled()){
- log.debug("runServerQuery, FieldIndex Query: "+q);
- }
- } catch (org.apache.commons.jexl2.parser.ParseException ex) {
- log.error("Could not parse query, Jexl ParseException: "+ex);
- } catch (Exception ex) {
- log.error("Problem rewriting query, Exception: "+ex.getMessage());
- }
- si.addOption(BooleanLogicIterator.FIELD_INDEX_QUERY,q);
-
- //Set the term cardinality option
- sep = "";
- buf.delete(0, buf.length());
- for (Entry<String, Long> entry : termIndexInfo.getTermCardinality().entrySet()) {
- buf.append(sep);
- buf.append(entry.getKey());
- buf.append(":");
- buf.append(entry.getValue());
- sep = ",";
- }
- if (log.isDebugEnabled())
- log.debug("Setting scan option: " + BooleanLogicIterator.TERM_CARDINALITIES + " to " + buf.toString());
- si.addOption(BooleanLogicIterator.TERM_CARDINALITIES, buf.toString());
- if (this.useReadAheadIterator) {
- if (log.isDebugEnabled()) {
- log.debug("Enabling read ahead iterator with queue size: " + this.readAheadQueueSize
- + " and timeout: " + this.readAheadTimeOut);
- }
- si.addOption(ReadAheadIterator.QUEUE_SIZE, this.readAheadQueueSize);
- si.addOption(ReadAheadIterator.TIMEOUT, this.readAheadTimeOut);
-
- }
-
- if (null != unevaluatedExpressions) {
- StringBuilder unevaluatedExpressionList = new StringBuilder();
- String sep2 = "";
- for (String exp : unevaluatedExpressions) {
- unevaluatedExpressionList.append(sep2).append(exp);
- sep2 = ",";
- }
- if (log.isDebugEnabled())
- log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString());
- si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString());
- }
-
- bs.addScanIterator(si);
-
- processResults.start();
- processResults.suspend();
- long count = 0;
- for (Entry<Key, Value> entry : bs) {
- count++;
- if (log.isDebugEnabled()) {
- log.debug("Key: " + entry.getKey());// + "\nValue: " + entry.getValue() + "\n");
- }
- //The key that is returned by the EvaluatingIterator is not the same key that is in
- //the table. The value that is returned by the EvaluatingIterator is a kryo
- //serialized EventFields object.
- processResults.resume();
- Document d = this.createDocument(entry.getKey(), entry.getValue());
- results.getResults().add(d);
- processResults.suspend();
- }
- log.info(count + " matching entries found in optimized query.");
- optimizationSucceeded = true;
- processResults.stop();
- } catch (TableNotFoundException e) {
- log.error(this.getTableName() + "not found", e);
- throw new RuntimeException(this.getIndexTableName() + "not found", e);
- } finally {
- if (bs != null) {
- bs.close();
- }
- }
- optimizedEventQuery.stop();
- }
- optimizedQuery.stop();
+ if (log.isDebugEnabled()) {
+ log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString);
}
-
- // WE should look into finding a better way to handle whether we do an optimized query or not.
- //We are not setting up an else condition here because we may have aborted the logic early in the if statement.
- if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()) && !orsAllIndexed)) {
- //if (!optimizationSucceeded || ((null != orTerms && orTerms.size() > 0) && (indexedTerms.size() != fields.size()))) {
- fullScanQuery.start();
- if (log.isDebugEnabled()) {
- log.debug(hash + " Performing full scan query");
- }
-
- //Set up a full scan using the date ranges from the query
- //Create BatchScanner, set the ranges, and setup the iterators.
- BatchScanner bs = null;
- try {
- //The ranges are the start and end dates
- Collection<Range> r = getFullScanRange(beginDate, endDate, terms);
- ranges.addAll(r);
-
- if (log.isDebugEnabled()) {
- log.debug(hash + " Ranges: count: " + ranges.size() + ", " + ranges.toString());
- }
-
- bs = connector.createBatchScanner(this.getTableName(), auths, queryThreads);
- bs.setRanges(ranges);
- IteratorSetting si = new IteratorSetting(22, "eval", EvaluatingIterator.class);
- //Create datatype regex if needed
- if (null != typeFilter) {
- StringBuilder buf = new StringBuilder();
- String s = "";
- for (String type : typeFilter) {
- buf.append(s).append(type).append(".*");
- s = "|";
- }
- if (log.isDebugEnabled())
- log.debug("Setting colf regex iterator to: " + buf.toString());
- IteratorSetting ri = new IteratorSetting(21, "typeFilter", RegExIterator.class);
- ri.addOption(RegExFilter.COLF_REGEX, buf.toString());
- bs.addScanIterator(ri);
- }
- if (log.isDebugEnabled()) {
- log.debug("Setting scan option: " + EvaluatingIterator.QUERY_OPTION + " to " + queryString);
- }
- si.addOption(EvaluatingIterator.QUERY_OPTION, queryString);
- if (null != unevaluatedExpressions) {
- StringBuilder unevaluatedExpressionList = new StringBuilder();
- String sep2 = "";
- for (String exp : unevaluatedExpressions) {
- unevaluatedExpressionList.append(sep2).append(exp);
- sep2 = ",";
- }
- if (log.isDebugEnabled())
- log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString());
- si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString());
- }
- bs.addScanIterator(si);
- long count = 0;
- processResults.start();
- processResults.suspend();
- for (Entry<Key, Value> entry : bs) {
- count++;
- //The key that is returned by the EvaluatingIterator is not the same key that is in
- //the partition table. The value that is returned by the EvaluatingIterator is a kryo
- //serialized EventFields object.
- processResults.resume();
- Document d = this.createDocument(entry.getKey(), entry.getValue());
- results.getResults().add(d);
- processResults.suspend();
- }
- processResults.stop();
- log.info(count + " matching entries found in full scan query.");
- } catch (TableNotFoundException e) {
- log.error(this.getTableName() + "not found", e);
- } finally {
- if (bs != null) {
- bs.close();
- }
- }
- fullScanQuery.stop();
+ si.addOption(EvaluatingIterator.QUERY_OPTION, queryString);
+ if (null != unevaluatedExpressions) {
+ StringBuilder unevaluatedExpressionList = new StringBuilder();
+ String sep2 = "";
+ for (String exp : unevaluatedExpressions) {
+ unevaluatedExpressionList.append(sep2).append(exp);
+ sep2 = ",";
+ }
+ if (log.isDebugEnabled())
+ log.debug("Setting scan option: " + EvaluatingIterator.UNEVALUTED_EXPRESSIONS + " to " + unevaluatedExpressionList.toString());
+ si.addOption(EvaluatingIterator.UNEVALUTED_EXPRESSIONS, unevaluatedExpressionList.toString());
+ }
+ bs.addScanIterator(si);
+ long count = 0;
+ processResults.start();
+ processResults.suspend();
+ for (Entry<Key,Value> entry : bs) {
+ count++;
+ // The key that is returned by the EvaluatingIterator is not the same key that is in
+ // the partition table. The value that is returned by the EvaluatingIterator is a kryo
+ // serialized EventFields object.
+ processResults.resume();
+ Document d = this.createDocument(entry.getKey(), entry.getValue());
+ results.getResults().add(d);
+ processResults.suspend();
+ }
+ processResults.stop();
+ log.info(count + " matching entries found in full scan query.");
+ } catch (TableNotFoundException e) {
+ log.error(this.getTableName() + "not found", e);
+ } finally {
+ if (bs != null) {
+ bs.close();
}
-
- log.info("AbstractQueryLogic: " + queryString + " " + timeString(abstractQueryLogic.getTime()));
- log.info(" 1) parse query " + timeString(parseQuery.getTime()));
- log.info(" 2) query metadata " + timeString(queryMetadata.getTime()));
- log.info(" 3) full scan query " + timeString(fullScanQuery.getTime()));
- log.info(" 3) optimized query " + timeString(optimizedQuery.getTime()));
- log.info(" 1) process results " + timeString(processResults.getTime()));
- log.info(" 1) query global index " + timeString(queryGlobalIndex.getTime()));
- log.info(hash + " Query completed.");
-
- return results;
- }
- private static String timeString(long millis) {
- return String.format("%4.2f", millis / 1000.);
+ }
+ fullScanQuery.stop();
}
+ log.info("AbstractQueryLogic: " + queryString + " " + timeString(abstractQueryLogic.getTime()));
+ log.info(" 1) parse query " + timeString(parseQuery.getTime()));
+ log.info(" 2) query metadata " + timeString(queryMetadata.getTime()));
+ log.info(" 3) full scan query " + timeString(fullScanQuery.getTime()));
+ log.info(" 3) optimized query " + timeString(optimizedQuery.getTime()));
+ log.info(" 1) process results " + timeString(processResults.getTime()));
+ log.info(" 1) query global index " + timeString(queryGlobalIndex.getTime()));
+ log.info(hash + " Query completed.");
+
+ return results;
+ }
+
+ private static String timeString(long millis) {
+ return String.format("%4.2f", millis / 1000.);
+ }
+
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/ContentLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/ContentLogic.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/ContentLogic.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/ContentLogic.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package logic;
import ingest.WikipediaMapper;
@@ -39,77 +39,72 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.security.Authorizations;
/**
- * This query table implementation returns a Results object that contains documents
- * from the wiki table. The query will contain the partition id, wikitype, and UID so that we
- * can seek directly to the document. The document is stored as base64 compressed binary
- * in the Accumulo table. We will decompress the data so that it is base64 encoded binary
- * data in the Results object.
+ * This query table implementation returns a Results object that contains documents from the wiki table. The query will contain the partition id, wikitype, and
+ * UID so that we can seek directly to the document. The document is stored as base64 compressed binary in the Accumulo table. We will decompress the data so
+ * that it is base64 encoded binary data in the Results object.
+ *
+ * The query that needs to be passed to the web service is: DOCUMENT:partitionId/wikitype/uid.
*
- * The query that needs to be passed to the web service is:
- * DOCUMENT:partitionId/wikitype/uid.
- *
*/
public class ContentLogic {
-
- private static final Logger log = Logger.getLogger(ContentLogic.class);
-
- private static final String NULL_BYTE = "\u0000";
-
- private String tableName = null;
-
- private Pattern queryPattern = Pattern.compile("^DOCUMENT:(.*)/(.*)/(.*)$");
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public Results runQuery(Connector connector, String query, List<String> authorizations) {
-
- Results results = new Results();
- Authorizations auths = new Authorizations(StringUtils.join(authorizations, "|"));
-
-
- Matcher match = queryPattern.matcher(query);
- if (!match.matches()) {
- throw new IllegalArgumentException("Query does not match the pattern: DOCUMENT:partitionId/wikitype/uid, your query: " + query.toString());
- } else {
- String partitionId = match.group(1);
- String wikitype = match.group(2);
- String id = match.group(3);
-
- log.debug("Received pieces: " + partitionId + ", " + wikitype + ", " + id);
-
- //Create the Range
- Key startKey = new Key(partitionId, WikipediaMapper.DOCUMENT_COLUMN_FAMILY, wikitype+NULL_BYTE+id);
- Key endKey = new Key(partitionId, WikipediaMapper.DOCUMENT_COLUMN_FAMILY, wikitype+NULL_BYTE+id+NULL_BYTE);
- Range r = new Range(startKey, true, endKey, false);
-
- log.debug("Setting range: " + r);
-
- try {
- Scanner scanner = connector.createScanner(this.getTableName(), auths);
- scanner.setRange(r);
- //This should in theory only match one thing.
- for (Entry<Key,Value> entry : scanner) {
- Document doc = new Document();
- doc.setId(id);
- Field val = new Field();
- val.setFieldName("DOCUMENT");
- val.setFieldValue(new String(Base64.decode(entry.getValue().toString())));
- doc.getFields().add(val);
- results.getResults().add(doc);
- }
- } catch (TableNotFoundException e) {
- throw new RuntimeException("Table not found: " + this.getTableName(), e);
- }
-
- }
- return results;
- }
-
-
+
+ private static final Logger log = Logger.getLogger(ContentLogic.class);
+
+ private static final String NULL_BYTE = "\u0000";
+
+ private String tableName = null;
+
+ private Pattern queryPattern = Pattern.compile("^DOCUMENT:(.*)/(.*)/(.*)$");
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public Results runQuery(Connector connector, String query, List<String> authorizations) {
+
+ Results results = new Results();
+ Authorizations auths = new Authorizations(StringUtils.join(authorizations, "|"));
+
+ Matcher match = queryPattern.matcher(query);
+ if (!match.matches()) {
+ throw new IllegalArgumentException("Query does not match the pattern: DOCUMENT:partitionId/wikitype/uid, your query: " + query.toString());
+ } else {
+ String partitionId = match.group(1);
+ String wikitype = match.group(2);
+ String id = match.group(3);
+
+ log.debug("Received pieces: " + partitionId + ", " + wikitype + ", " + id);
+
+ // Create the Range
+ Key startKey = new Key(partitionId, WikipediaMapper.DOCUMENT_COLUMN_FAMILY, wikitype + NULL_BYTE + id);
+ Key endKey = new Key(partitionId, WikipediaMapper.DOCUMENT_COLUMN_FAMILY, wikitype + NULL_BYTE + id + NULL_BYTE);
+ Range r = new Range(startKey, true, endKey, false);
+
+ log.debug("Setting range: " + r);
+
+ try {
+ Scanner scanner = connector.createScanner(this.getTableName(), auths);
+ scanner.setRange(r);
+ // This should in theory only match one thing.
+ for (Entry<Key,Value> entry : scanner) {
+ Document doc = new Document();
+ doc.setId(id);
+ Field val = new Field();
+ val.setFieldName("DOCUMENT");
+ val.setFieldValue(new String(Base64.decode(entry.getValue().toString())));
+ doc.getFields().add(val);
+ results.getResults().add(doc);
+ }
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException("Table not found: " + this.getTableName(), e);
+ }
+
+ }
+ return results;
+ }
+
}