You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2011/12/27 19:19:44 UTC
svn commit: r1224966 [1/10] - in /incubator/accumulo/branches/1.4: ./
contrib/accumulo_sample/
contrib/accumulo_sample/ingest/src/main/java/aggregator/
contrib/accumulo_sample/ingest/src/main/java/ingest/
contrib/accumulo_sample/ingest/src/test/java/ag...
Author: ecn
Date: Tue Dec 27 18:19:43 2011
New Revision: 1224966
URL: http://svn.apache.org/viewvc?rev=1224966&view=rev
Log:
ACCUMULO-230: merge sample problems found in testing trunk to 1.4 branch
Modified:
incubator/accumulo/branches/1.4/ (props changed)
incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/pom.xml
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/BooleanLogicIterator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/BooleanLogicTreeNode.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/DefaultIteratorEnvironment.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/EvaluatingIterator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/FieldIndexIterator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/OptimizedQueryIterator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/UniqFieldNameValueIterator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/parser/FieldIndexQueryReWriter.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/parser/QueryParser.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/parser/RangeCalculator.java
incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/sample/query/Query.java
Propchange: incubator/accumulo/branches/1.4/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 27 18:19:43 2011
@@ -1,4 +1,4 @@
/incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320
/incubator/accumulo/branches/1.3.5rc:1209938
/incubator/accumulo/branches/1.4:1205476
-/incubator/accumulo/trunk:1205476,1205570,1208726
+/incubator/accumulo/trunk:1205476,1205570,1208726,1222413,1222719,1222733-1222734
Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java Tue Dec 27 18:19:43 2011
@@ -18,11 +18,11 @@ package aggregator;
import java.util.HashSet;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.aggregation.Aggregator;
import org.apache.log4j.Logger;
import protobuf.Uid;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.aggregation.Aggregator;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -32,6 +32,7 @@ import com.google.protobuf.InvalidProtoc
* index for low cardinality terms (Low in this case being less than 20).
*
*/
+@SuppressWarnings("deprecation")
public class GlobalIndexUidAggregator implements Aggregator {
private static final Logger log = Logger.getLogger(GlobalIndexUidAggregator.class);
Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java Tue Dec 27 18:19:43 2011
@@ -20,11 +20,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.aggregation.Aggregator;
import org.apache.log4j.Logger;
import protobuf.TermWeight;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.aggregation.Aggregator;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -32,6 +32,7 @@ import com.google.protobuf.InvalidProtoc
* An Aggregator to merge together a list of term offsets and one normalized term frequency
*
*/
+@SuppressWarnings("deprecation")
public class TextIndexAggregator implements Aggregator {
private static final Logger log = Logger.getLogger(TextIndexAggregator.class);
Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java Tue Dec 27 18:19:43 2011
@@ -18,6 +18,11 @@ package ingest;
import java.io.IOException;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -25,119 +30,120 @@ import org.apache.hadoop.util.Reflection
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.SimpleAnalyzer;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-
-
public class WikipediaConfiguration {
- public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
- public final static String USER = "wikipedia.accumulo.user";
- public final static String PASSWORD = "wikipedia.accumulo.password";
- public final static String TABLE_NAME = "wikipedia.accumulo.table";
-
- public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
-
- public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
- public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
- public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
-
- public final static String ANALYZER = "wikipedia.index.analyzer";
-
- public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
-
- public static String getUser(Configuration conf) { return conf.get(USER); };
-
- public static byte[] getPassword(Configuration conf) {
- String pass = conf.get(PASSWORD);
- if (pass == null) {
- return null;
- }
- return pass.getBytes();
- }
-
- public static String getTableName(Configuration conf) {
- String tablename = conf.get(TABLE_NAME);
- if (tablename == null) {
- throw new RuntimeException("No data table name specified in " + TABLE_NAME);
- }
- return tablename;
- }
-
- public static String getInstanceName(Configuration conf) { return conf.get(INSTANCE_NAME); }
-
- public static String getZookeepers(Configuration conf) {
- String zookeepers = conf.get(ZOOKEEPERS);
- if (zookeepers == null) {
- throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
- }
- return zookeepers;
- }
-
- public static Path getNamespacesFile(Configuration conf) {
- String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
- return new Path(filename);
- }
- public static Path getLanguagesFile(Configuration conf) {
- String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
- return new Path(filename);
- }
- public static Path getWorkingDirectory(Configuration conf) {
- String filename = conf.get(WORKING_DIRECTORY);
- return new Path(filename);
- }
-
- public static Analyzer getAnalyzer(Configuration conf) throws IOException {
- Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
- return ReflectionUtils.newInstance(analyzerClass, conf);
- }
-
- public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
- return new Connector(getInstance(conf), getUser(conf), getPassword(conf));
- }
-
- public static Instance getInstance(Configuration conf) {
- return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
- }
-
- public static int getNumPartitions(Configuration conf) {
- return conf.getInt(NUM_PARTITIONS, 25);
- }
-
- /**
- * Helper method to get properties from Hadoop configuration
- * @param <T>
- * @param conf
- * @param propertyName
- * @param resultClass
- * @throws IllegalArgumentException if property is not defined, null, or empty. Or if resultClass is not handled.
- * @return value of property
- */
- @SuppressWarnings("unchecked")
- public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
- String p = conf.get(propertyName);
- if (StringUtils.isEmpty(p))
- throw new IllegalArgumentException(propertyName + " must be specified");
-
- if (resultClass.equals(String.class))
- return (T) p;
- else if (resultClass.equals(String[].class))
- return (T) conf.getStrings(propertyName);
- else if (resultClass.equals(Boolean.class))
- return (T) Boolean.valueOf(p);
- else if (resultClass.equals(Long.class))
- return (T) Long.valueOf(p);
- else if (resultClass.equals(Integer.class))
- return (T) Integer.valueOf(p);
- else if (resultClass.equals(Float.class))
- return (T) Float.valueOf(p);
- else if (resultClass.equals(Double.class))
- return (T) Double.valueOf(p);
- else
- throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
-
- }
-
+ public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
+ public final static String USER = "wikipedia.accumulo.user";
+ public final static String PASSWORD = "wikipedia.accumulo.password";
+ public final static String TABLE_NAME = "wikipedia.accumulo.table";
+
+ public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
+
+ public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
+ public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
+ public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
+
+ public final static String ANALYZER = "wikipedia.index.analyzer";
+
+ public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
+
+ public static String getUser(Configuration conf) {
+ return conf.get(USER);
+ };
+
+ public static byte[] getPassword(Configuration conf) {
+ String pass = conf.get(PASSWORD);
+ if (pass == null) {
+ return null;
+ }
+ return pass.getBytes();
+ }
+
+ public static String getTableName(Configuration conf) {
+ String tablename = conf.get(TABLE_NAME);
+ if (tablename == null) {
+ throw new RuntimeException("No data table name specified in " + TABLE_NAME);
+ }
+ return tablename;
+ }
+
+ public static String getInstanceName(Configuration conf) {
+ return conf.get(INSTANCE_NAME);
+ }
+
+ public static String getZookeepers(Configuration conf) {
+ String zookeepers = conf.get(ZOOKEEPERS);
+ if (zookeepers == null) {
+ throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
+ }
+ return zookeepers;
+ }
+
+ public static Path getNamespacesFile(Configuration conf) {
+ String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
+ return new Path(filename);
+ }
+
+ public static Path getLanguagesFile(Configuration conf) {
+ String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
+ return new Path(filename);
+ }
+
+ public static Path getWorkingDirectory(Configuration conf) {
+ String filename = conf.get(WORKING_DIRECTORY);
+ return new Path(filename);
+ }
+
+ public static Analyzer getAnalyzer(Configuration conf) throws IOException {
+ Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
+ return ReflectionUtils.newInstance(analyzerClass, conf);
+ }
+
+ public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
+ return getInstance(conf).getConnector(getUser(conf), getPassword(conf));
+ }
+
+ public static Instance getInstance(Configuration conf) {
+ return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
+ }
+
+ public static int getNumPartitions(Configuration conf) {
+ return conf.getInt(NUM_PARTITIONS, 25);
+ }
+
+ /**
+ * Helper method to get properties from Hadoop configuration
+ *
+ * @param <T>
+ * @param conf
+ * @param propertyName
+ * @param resultClass
+ * @throws IllegalArgumentException
+ * if property is not defined, null, or empty. Or if resultClass is not handled.
+ * @return value of property
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
+ String p = conf.get(propertyName);
+ if (StringUtils.isEmpty(p))
+ throw new IllegalArgumentException(propertyName + " must be specified");
+
+ if (resultClass.equals(String.class))
+ return (T) p;
+ else if (resultClass.equals(String[].class))
+ return (T) conf.getStrings(propertyName);
+ else if (resultClass.equals(Boolean.class))
+ return (T) Boolean.valueOf(p);
+ else if (resultClass.equals(Long.class))
+ return (T) Long.valueOf(p);
+ else if (resultClass.equals(Integer.class))
+ return (T) Integer.valueOf(p);
+ else if (resultClass.equals(Float.class))
+ return (T) Float.valueOf(p);
+ else if (resultClass.equals(Double.class))
+ return (T) Double.valueOf(p);
+ else
+ throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
+
+ }
+
}
Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java Tue Dec 27 18:19:43 2011
@@ -26,6 +26,18 @@ import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.aggregation.NumSummation;
+import org.apache.accumulo.core.iterators.aggregation.conf.AggregatorConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -40,19 +52,8 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import reader.AggregatingRecordReader;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.aggregation.NumSummation;
-import org.apache.accumulo.core.iterators.aggregation.conf.AggregatorConfiguration;
+@SuppressWarnings("deprecation")
public class WikipediaIngester extends Configured implements Tool {
public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java Tue Dec 27 18:19:43 2011
@@ -21,14 +21,17 @@ import java.util.List;
import java.util.UUID;
import junit.framework.TestCase;
-import protobuf.Uid;
-import protobuf.Uid.List.Builder;
+
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.aggregation.Aggregator;
+import protobuf.Uid;
+import protobuf.Uid.List.Builder;
+
+@SuppressWarnings("deprecation")
public class GlobalIndexUidAggregatorTest extends TestCase {
- Aggregator agg = new GlobalIndexUidAggregator();
+ Aggregator agg = new GlobalIndexUidAggregator();
private Uid.List.Builder createNewUidList() {
Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/pom.xml?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/pom.xml (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/pom.xml Tue Dec 27 18:19:43 2011
@@ -171,6 +171,7 @@
<version.googlecollections>1.0</version.googlecollections>
<version.libthrift>0.6.1</version.libthrift>
<version.zookeeper>3.3.1</version.zookeeper>
+ <version.minlog>1.2</version.minlog>
</properties>
<dependencyManagement>
@@ -212,6 +213,11 @@
</dependency>
<dependency>
<groupId>com.googlecode</groupId>
+ <artifactId>minlog</artifactId>
+ <version>${version.minlog}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode</groupId>
<artifactId>kryo</artifactId>
<version>${version.kryo}</version>
</dependency>
Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java Tue Dec 27 18:19:43 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package iterator;
import java.io.IOException;
@@ -25,11 +25,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.jexl2.parser.ParseException;
-import org.apache.log4j.Logger;
-
-import parser.EventFields;
-import parser.QueryEvaluator;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
@@ -38,291 +33,291 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.commons.jexl2.parser.ParseException;
+import org.apache.log4j.Logger;
+
+import parser.EventFields;
+import parser.QueryEvaluator;
import com.esotericsoftware.kryo.Kryo;
/**
*
- * This iterator aggregates rows together using the specified key comparator. Subclasses will
- * provide their own implementation of fillMap which will fill the supplied EventFields object
- * with field names (key) and field values (value). After all fields have been put into the
- * aggregated object (by aggregating all columns with the same key), the EventFields object will be compared
- * against the supplied expression. If the expression returns true, then the return key and
- * return value can be retrieved via getTopKey() and getTopValue().
+ * This iterator aggregates rows together using the specified key comparator. Subclasses will provide their own implementation of fillMap which will fill the
+ * supplied EventFields object with field names (key) and field values (value). After all fields have been put into the aggregated object (by aggregating all
+ * columns with the same key), the EventFields object will be compared against the supplied expression. If the expression returns true, then the return key and
+ * return value can be retrieved via getTopKey() and getTopValue().
*
- * Optionally, the caller can set an expression (field operator value) that should not be evaluated against
- * the event. For example, if the query is "A == 'foo' and B == 'bar'", but for some reason B may not be in
- * the data, then setting the UNEVALUATED_EXPRESSIONS option to "B == 'bar'" will allow the events to be
- * evaluated against the remainder of the expression and still return as true.
+ * Optionally, the caller can set an expression (field operator value) that should not be evaluated against the event. For example, if the query is
+ * "A == 'foo' and B == 'bar'", but for some reason B may not be in the data, then setting the UNEVALUATED_EXPRESSIONS option to "B == 'bar'" will allow the
+ * events to be evaluated against the remainder of the expression and still return as true.
*
- * By default this iterator will return all Events in the shard. If the START_DATE and
- * END_DATE are specified, then this iterator will evaluate the timestamp of the key against
- * the start and end dates. If the event date is not within the range of start to end, then it is skipped.
+ * By default this iterator will return all Events in the shard. If the START_DATE and END_DATE are specified, then this iterator will evaluate the timestamp of
+ * the key against the start and end dates. If the event date is not within the range of start to end, then it is skipped.
*
- * This iterator will return up the stack an EventFields object serialized using Kryo in the cell Value.
+ * This iterator will return up the stack an EventFields object serialized using Kryo in the cell Value.
*
*/
-public abstract class AbstractEvaluatingIterator implements SortedKeyValueIterator<Key, Value>, OptionDescriber {
-
- private static Logger log = Logger.getLogger(AbstractEvaluatingIterator.class);
- protected static final byte[] NULL_BYTE = new byte[0];
- public static final String QUERY_OPTION = "expr";
- public static final String UNEVALUTED_EXPRESSIONS = "unevaluated.expressions";
-
- private PartialKey comparator = null;
- private SortedKeyValueIterator<Key, Value> iterator;
- private Key currentKey = new Key();
- private Key returnKey;
- private Value returnValue;
- private String expression;
- private QueryEvaluator evaluator;
- private EventFields event = null;
- private static Kryo kryo = new Kryo();
- private Range seekRange = null;
- private Set<String> skipExpressions = null;
-
- protected AbstractEvaluatingIterator(AbstractEvaluatingIterator other, IteratorEnvironment env) {
- iterator = other.iterator.deepCopy(env);
- event = other.event;
- }
-
- public AbstractEvaluatingIterator() {
- }
-
- /**
- * Implementations will return the PartialKey value to use for comparing keys for aggregating events
- *
- * @return the type of comparator to use
- */
- public abstract PartialKey getKeyComparator();
-
- /**
- * When the query expression evaluates to true against the event, the event fields
- * will be serialized into the Value and returned up the iterator stack. Implemenations
- * will need to provide a key to be used with the event.
- *
- * @param k
- * @return the key that should be returned with the map of values.
- */
- public abstract Key getReturnKey(Key k) throws Exception;
-
- /**
- * Implementations will need to fill the map with field visibilities, names, and
- * values. When all fields have been aggregated the event will be evaluated against
- * the query expression.
- *
- * @param event Multimap of event names and fields.
- * @param key current Key
- * @param value current Value
- */
- public abstract void fillMap(EventFields event, Key key, Value value) throws Exception;
-
- /**
- * Check to see if this key should be acted upon. Provides the ability to skip this key
- * and all of the following ones that match using the comparator.
- *
- * @param key
- * @return
- */
- public abstract boolean isKeyAccepted(Key key);
-
- /**
- * Reset state.
- */
- public void reset() {
- event.clear();
+public abstract class AbstractEvaluatingIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+
+ private static Logger log = Logger.getLogger(AbstractEvaluatingIterator.class);
+ protected static final byte[] NULL_BYTE = new byte[0];
+ public static final String QUERY_OPTION = "expr";
+ public static final String UNEVALUTED_EXPRESSIONS = "unevaluated.expressions";
+
+ private PartialKey comparator = null;
+ protected SortedKeyValueIterator<Key,Value> iterator;
+ private Key currentKey = new Key();
+ private Key returnKey;
+ private Value returnValue;
+ private String expression;
+ private QueryEvaluator evaluator;
+ private EventFields event = null;
+ private static Kryo kryo = new Kryo();
+ private Range seekRange = null;
+ private Set<String> skipExpressions = null;
+
+ protected AbstractEvaluatingIterator(AbstractEvaluatingIterator other, IteratorEnvironment env) {
+ iterator = other.iterator.deepCopy(env);
+ event = other.event;
+ }
+
+ public AbstractEvaluatingIterator() {}
+
+ /**
+ * Implementations will return the PartialKey value to use for comparing keys for aggregating events
+ *
+ * @return the type of comparator to use
+ */
+ public abstract PartialKey getKeyComparator();
+
+ /**
+ * When the query expression evaluates to true against the event, the event fields will be serialized into the Value and returned up the iterator stack.
+ * Implemenations will need to provide a key to be used with the event.
+ *
+ * @param k
+ * @return the key that should be returned with the map of values.
+ */
+ public abstract Key getReturnKey(Key k) throws Exception;
+
+ /**
+ * Implementations will need to fill the map with field visibilities, names, and values. When all fields have been aggregated the event will be evaluated
+ * against the query expression.
+ *
+ * @param event
+ * Multimap of event names and fields.
+ * @param key
+ * current Key
+ * @param value
+ * current Value
+ */
+ public abstract void fillMap(EventFields event, Key key, Value value) throws Exception;
+
+ /**
+ * Check to see if this key should be acted upon. Provides the ability to skip this key and all of the following ones that match using the comparator.
+ *
+ * @param key
+ * @return
+ * @throws IOException
+ */
+ public abstract boolean isKeyAccepted(Key key) throws IOException;
+
+ /**
+ * Reset state.
+ */
+ public void reset() {
+ event.clear();
+ }
+
+ private void aggregateRowColumn(EventFields event) throws IOException {
+
+ currentKey.set(iterator.getTopKey());
+
+ try {
+ fillMap(event, iterator.getTopKey(), iterator.getTopValue());
+ iterator.next();
+
+ while (iterator.hasTop() && iterator.getTopKey().equals(currentKey, this.comparator)) {
+ fillMap(event, iterator.getTopKey(), iterator.getTopValue());
+ iterator.next();
+ }
+
+ // Get the return key
+ returnKey = getReturnKey(currentKey);
+ } catch (Exception e) {
+ throw new IOException("Error aggregating event", e);
}
-
- private void aggregateRowColumn(EventFields event) throws IOException {
-
- currentKey.set(iterator.getTopKey());
-
- try {
- fillMap(event, iterator.getTopKey(), iterator.getTopValue());
- iterator.next();
-
- while (iterator.hasTop() && iterator.getTopKey().equals(currentKey, this.comparator)) {
- fillMap(event, iterator.getTopKey(), iterator.getTopValue());
- iterator.next();
- }
-
- //Get the return key
- returnKey = getReturnKey(currentKey);
- } catch (Exception e) {
- throw new IOException("Error aggregating event", e);
+
+ }
+
+ private void findTop() throws IOException {
+ do {
+ reset();
+ // check if aggregation is needed
+ if (iterator.hasTop()) {
+ // Check to see if the current key is accepted. For example in the wiki
+ // table there are field index rows. We don't want to process those in
+ // some cases so return right away. Consume all of the non-accepted keys
+ while (iterator.hasTop() && !isKeyAccepted(iterator.getTopKey())) {
+ iterator.next();
}
-
- }
-
- private void findTop() throws IOException {
- do {
- reset();
- //check if aggregation is needed
- if (iterator.hasTop()) {
- //Check to see if the current key is accepted. For example in the wiki
- //table there are field index rows. We don't want to process those in
- //some cases so return right away. Consume all of the non-accepted keys
- while (iterator.hasTop() && !isKeyAccepted(iterator.getTopKey())) {
- iterator.next();
- }
-
- if (iterator.hasTop()) {
- aggregateRowColumn(event);
-
- //Evaluate the event against the expression
- if (event.size() > 0 && this.evaluator.evaluate(event)) {
- if (log.isDebugEnabled()) {
- log.debug("Event evaluated to true, key = " + returnKey);
- }
- //Create a byte array
- byte[] serializedMap = new byte[event.getByteSize() + (event.size() * 20)];
- //Wrap in ByteBuffer to work with Kryo
- ByteBuffer buf = ByteBuffer.wrap(serializedMap);
- //Serialize the EventFields object
- event.writeObjectData(kryo, buf);
- //Truncate array to the used size.
- returnValue = new Value(Arrays.copyOfRange(serializedMap, 0, buf.position()));
- } else {
- returnKey = null;
- returnValue = null;
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Iterator no longer has top.");
- }
- }
- } else {
- log.info("Iterator.hasTop() == false");
+
+ if (iterator.hasTop()) {
+ aggregateRowColumn(event);
+
+ // Evaluate the event against the expression
+ if (event.size() > 0 && this.evaluator.evaluate(event)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Event evaluated to true, key = " + returnKey);
}
- } while (returnValue == null && iterator.hasTop());
-
- //Sanity check. Make sure both returnValue and returnKey are null or both are not null
- if (!((returnKey == null && returnValue == null) || (returnKey != null && returnValue != null))) {
- log.warn("Key: " + ((returnKey == null) ? "null" : returnKey.toString()));
- log.warn("Value: " + ((returnValue == null) ? "null" : returnValue.toString()));
- throw new IOException("Return values are inconsistent");
- }
- }
-
- public Key getTopKey() {
- if (returnKey != null) {
- return returnKey;
- }
- return iterator.getTopKey();
- }
-
- public Value getTopValue() {
- if (returnValue != null) {
- return returnValue;
- }
- return iterator.getTopValue();
- }
-
- public boolean hasTop() {
- return returnKey != null || iterator.hasTop();
- }
-
- public void next() throws IOException {
- if (returnKey != null) {
+ // Create a byte array
+ byte[] serializedMap = new byte[event.getByteSize() + (event.size() * 20)];
+ // Wrap in ByteBuffer to work with Kryo
+ ByteBuffer buf = ByteBuffer.wrap(serializedMap);
+ // Serialize the EventFields object
+ event.writeObjectData(kryo, buf);
+ // Truncate array to the used size.
+ returnValue = new Value(Arrays.copyOfRange(serializedMap, 0, buf.position()));
+ } else {
returnKey = null;
returnValue = null;
- } else if (iterator.hasTop()) {
- iterator.next();
- }
-
- findTop();
- }
-
- /**
- * Copy of IteratorUtil.maximizeStartKeyTimeStamp due to IllegalAccessError
- * @param range
- * @return
- */
- static Range maximizeStartKeyTimeStamp(Range range) {
- Range seekRange = range;
-
- if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
- Key seekKey = new Key(seekRange.getStartKey());
- seekKey.setTimestamp(Long.MAX_VALUE);
- seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
- }
-
- return seekRange;
- }
-
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- //do not want to seek to the middle of a value that should be
- //aggregated...
-
- seekRange = maximizeStartKeyTimeStamp(range);
-
- iterator.seek(seekRange, columnFamilies, inclusive);
- findTop();
-
- if (range.getStartKey() != null) {
- while (hasTop()
- && getTopKey().equals(range.getStartKey(), this.comparator)
- && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
- //the value has a more recent time stamp, so
- //pass it up
- //log.debug("skipping "+getTopKey());
- next();
- }
-
- while (hasTop() && range.beforeStartKey(getTopKey())) {
- next();
- }
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Iterator no longer has top.");
+ }
}
-
+ } else {
+ log.debug("Iterator.hasTop() == false");
+ }
+ } while (returnValue == null && iterator.hasTop());
+
+ // Sanity check. Make sure both returnValue and returnKey are null or both are not null
+ if (!((returnKey == null && returnValue == null) || (returnKey != null && returnValue != null))) {
+ log.warn("Key: " + ((returnKey == null) ? "null" : returnKey.toString()));
+ log.warn("Value: " + ((returnValue == null) ? "null" : returnValue.toString()));
+ throw new IOException("Return values are inconsistent");
+ }
+ }
+
+ public Key getTopKey() {
+ if (returnKey != null) {
+ return returnKey;
+ }
+ return iterator.getTopKey();
+ }
+
+ public Value getTopValue() {
+ if (returnValue != null) {
+ return returnValue;
+ }
+ return iterator.getTopValue();
+ }
+
+ public boolean hasTop() {
+ return returnKey != null || iterator.hasTop();
+ }
+
+ public void next() throws IOException {
+ if (returnKey != null) {
+ returnKey = null;
+ returnValue = null;
+ } else if (iterator.hasTop()) {
+ iterator.next();
}
-
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- validateOptions(options);
- event = new EventFields();
- this.comparator = getKeyComparator();
- this.iterator = source;
- try {
- //Replace any expressions that we should not evaluate.
- if (null != this.skipExpressions && this.skipExpressions.size() != 0) {
- for (String skip : this.skipExpressions) {
- //Expression should have form: field<sp>operator<sp>literal.
- //We are going to replace the expression with field == null.
- String field = skip.substring(0, skip.indexOf(" ") -1);
- this.expression = this.expression.replaceAll(skip, field+" == null");
- }
- }
- this.evaluator = new QueryEvaluator(this.expression);
- } catch (ParseException e) {
- throw new IllegalArgumentException("Failed to parse query", e);
- }
- EventFields.initializeKryo(kryo);
+
+ findTop();
+ }
+
+ /**
+ * Copy of IteratorUtil.maximizeStartKeyTimeStamp due to IllegalAccessError
+ *
+ * @param range
+ * @return
+ */
+ static Range maximizeStartKeyTimeStamp(Range range) {
+ Range seekRange = range;
+
+ if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
+ Key seekKey = new Key(seekRange.getStartKey());
+ seekKey.setTimestamp(Long.MAX_VALUE);
+ seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
}
-
- public IteratorOptions describeOptions() {
- Map<String,String> options = new HashMap<String,String>();
- options.put(QUERY_OPTION, "query expression");
- options.put(UNEVALUTED_EXPRESSIONS, "comma separated list of expressions to skip");
- return new IteratorOptions(getClass().getSimpleName(), "evaluates event objects against an expression", options, null);
+
+ return seekRange;
+ }
+
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ // do not want to seek to the middle of a value that should be
+ // aggregated...
+
+ seekRange = maximizeStartKeyTimeStamp(range);
+
+ iterator.seek(seekRange, columnFamilies, inclusive);
+ findTop();
+
+ if (range.getStartKey() != null) {
+ while (hasTop() && getTopKey().equals(range.getStartKey(), this.comparator) && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
+ // the value has a more recent time stamp, so
+ // pass it up
+ // log.debug("skipping "+getTopKey());
+ next();
+ }
+
+ while (hasTop() && range.beforeStartKey(getTopKey())) {
+ next();
+ }
}
-
- public boolean validateOptions(Map<String, String> options) {
- if (!options.containsKey(QUERY_OPTION))
- return false;
- else
- this.expression = options.get(QUERY_OPTION);
-
- if (options.containsKey(UNEVALUTED_EXPRESSIONS)) {
- String expressionList = options.get(UNEVALUTED_EXPRESSIONS);
- if (expressionList != null && !expressionList.trim().equals("")) {
- this.skipExpressions = new HashSet<String>();
- for (String e : expressionList.split(","))
- this.skipExpressions.add(e);
- }
+
+ }
+
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ validateOptions(options);
+ event = new EventFields();
+ this.comparator = getKeyComparator();
+ this.iterator = source;
+ try {
+ // Replace any expressions that we should not evaluate.
+ if (null != this.skipExpressions && this.skipExpressions.size() != 0) {
+ for (String skip : this.skipExpressions) {
+ // Expression should have form: field<sp>operator<sp>literal.
+ // We are going to replace the expression with field == null.
+ String field = skip.substring(0, skip.indexOf(" ") - 1);
+ this.expression = this.expression.replaceAll(skip, field + " == null");
}
- return true;
- }
-
- public String getQueryExpression() {
- return this.expression;
- }
+ }
+ this.evaluator = new QueryEvaluator(this.expression);
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Failed to parse query", e);
+ }
+ EventFields.initializeKryo(kryo);
+ }
+
+ public IteratorOptions describeOptions() {
+ Map<String,String> options = new HashMap<String,String>();
+ options.put(QUERY_OPTION, "query expression");
+ options.put(UNEVALUTED_EXPRESSIONS, "comma separated list of expressions to skip");
+ return new IteratorOptions(getClass().getSimpleName(), "evaluates event objects against an expression", options, null);
+ }
+
+ public boolean validateOptions(Map<String,String> options) {
+ if (!options.containsKey(QUERY_OPTION))
+ return false;
+ else
+ this.expression = options.get(QUERY_OPTION);
+
+ if (options.containsKey(UNEVALUTED_EXPRESSIONS)) {
+ String expressionList = options.get(UNEVALUTED_EXPRESSIONS);
+ if (expressionList != null && !expressionList.trim().equals("")) {
+ this.skipExpressions = new HashSet<String>();
+ for (String e : expressionList.split(","))
+ this.skipExpressions.add(e);
+ }
+ }
+ return true;
+ }
+
+ public String getQueryExpression() {
+ return this.expression;
+ }
}