You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2011/12/27 19:19:44 UTC

svn commit: r1224966 [1/10] - in /incubator/accumulo/branches/1.4: ./ contrib/accumulo_sample/ contrib/accumulo_sample/ingest/src/main/java/aggregator/ contrib/accumulo_sample/ingest/src/main/java/ingest/ contrib/accumulo_sample/ingest/src/test/java/ag...

Author: ecn
Date: Tue Dec 27 18:19:43 2011
New Revision: 1224966

URL: http://svn.apache.org/viewvc?rev=1224966&view=rev
Log:
ACCUMULO-230: merge sample problems found in testing trunk to 1.4 branch

Modified:
    incubator/accumulo/branches/1.4/   (props changed)
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/pom.xml
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/BooleanLogicIterator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/BooleanLogicTreeNode.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/DefaultIteratorEnvironment.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/EvaluatingIterator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/FieldIndexIterator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/OptimizedQueryIterator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/UniqFieldNameValueIterator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/parser/FieldIndexQueryReWriter.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/parser/QueryParser.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/parser/RangeCalculator.java
    incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/sample/query/Query.java

Propchange: incubator/accumulo/branches/1.4/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 27 18:19:43 2011
@@ -1,4 +1,4 @@
 /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320
 /incubator/accumulo/branches/1.3.5rc:1209938
 /incubator/accumulo/branches/1.4:1205476
-/incubator/accumulo/trunk:1205476,1205570,1208726
+/incubator/accumulo/trunk:1205476,1205570,1208726,1222413,1222719,1222733-1222734

Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java Tue Dec 27 18:19:43 2011
@@ -18,11 +18,11 @@ package aggregator;
 
 import java.util.HashSet;
 
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.aggregation.Aggregator;
 import org.apache.log4j.Logger;
 
 import protobuf.Uid;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.aggregation.Aggregator;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -32,6 +32,7 @@ import com.google.protobuf.InvalidProtoc
  * index for low cardinality terms (Low in this case being less than 20). 
  *
  */
+@SuppressWarnings("deprecation")
 public class GlobalIndexUidAggregator implements Aggregator {
 
 	private static final Logger log = Logger.getLogger(GlobalIndexUidAggregator.class);

Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java Tue Dec 27 18:19:43 2011
@@ -20,11 +20,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.aggregation.Aggregator;
 import org.apache.log4j.Logger;
 
 import protobuf.TermWeight;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.aggregation.Aggregator;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -32,6 +32,7 @@ import com.google.protobuf.InvalidProtoc
  * An Aggregator to merge together a list of term offsets and one normalized term frequency
  *
  */
+@SuppressWarnings("deprecation")
 public class TextIndexAggregator implements Aggregator {
     private static final Logger log = Logger.getLogger(TextIndexAggregator.class);
     

Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java Tue Dec 27 18:19:43 2011
@@ -18,6 +18,11 @@ package ingest;
 
 import java.io.IOException;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -25,119 +30,120 @@ import org.apache.hadoop.util.Reflection
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.SimpleAnalyzer;
 
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-
-
 public class WikipediaConfiguration {
-	public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
-	public final static String USER = "wikipedia.accumulo.user";
-	public final static String PASSWORD = "wikipedia.accumulo.password";
-	public final static String TABLE_NAME = "wikipedia.accumulo.table";
-	
-	public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
-
-	public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
-	public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
-	public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
-		
-	public final static String ANALYZER = "wikipedia.index.analyzer";
-	
-	public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
-	
-	public static String getUser(Configuration conf) { return conf.get(USER); };
-	
-	public static byte[] getPassword(Configuration conf) { 
-		String pass = conf.get(PASSWORD);
-		if (pass == null) {
-			return null;
-		}
-		return pass.getBytes();
-	}
-	
-	public static String getTableName(Configuration conf) {
-		String tablename = conf.get(TABLE_NAME);
-		if (tablename == null) {
-			throw new RuntimeException("No data table name specified in " + TABLE_NAME);
-		}
-		return tablename;
-	}
-	
-	public static String getInstanceName(Configuration conf) { return conf.get(INSTANCE_NAME); }
-	
-	public static String getZookeepers(Configuration conf) {
-		String zookeepers = conf.get(ZOOKEEPERS);
-		if (zookeepers == null) {
-			throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
-		}
-		return zookeepers;
-	}
-		
-	public static Path getNamespacesFile(Configuration conf) {
-		String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
-		return new Path(filename);
-	}
-	public static Path getLanguagesFile(Configuration conf) {
-		String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
-		return new Path(filename);
-	}
-	public static Path getWorkingDirectory(Configuration conf) {
-		String filename = conf.get(WORKING_DIRECTORY);
-		return new Path(filename);
-	}
-	
-	public static Analyzer getAnalyzer(Configuration conf) throws IOException {
-		Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
-		return ReflectionUtils.newInstance(analyzerClass, conf);
-	}
-	
-	public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
-		return new Connector(getInstance(conf), getUser(conf), getPassword(conf));
-	}
-
-	public static Instance getInstance(Configuration conf) {
-		return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
-	}
-	
-	public static int getNumPartitions(Configuration conf) {
-		return conf.getInt(NUM_PARTITIONS, 25);
-	}
-	
-	/**
-	 * Helper method to get properties from Hadoop configuration
-	 * @param <T>
-	 * @param conf
-	 * @param propertyName
-	 * @param resultClass
-	 * @throws IllegalArgumentException if property is not defined, null, or empty. Or if resultClass is not handled.
-	 * @return value of property
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
-		String p = conf.get(propertyName);
-		if (StringUtils.isEmpty(p))
-			throw new IllegalArgumentException(propertyName + " must be specified");
-		
-		if (resultClass.equals(String.class))
-			return (T) p;
-		else if (resultClass.equals(String[].class))
-			return (T) conf.getStrings(propertyName);
-		else if (resultClass.equals(Boolean.class))
-			return (T) Boolean.valueOf(p);
-		else if (resultClass.equals(Long.class))
-			return (T) Long.valueOf(p);
-		else if (resultClass.equals(Integer.class))
-			return (T) Integer.valueOf(p);
-		else if (resultClass.equals(Float.class))
-			return (T) Float.valueOf(p);
-		else if (resultClass.equals(Double.class))
-			return (T) Double.valueOf(p);
-		else
-			throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
-			
-	}
-
+  public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
+  public final static String USER = "wikipedia.accumulo.user";
+  public final static String PASSWORD = "wikipedia.accumulo.password";
+  public final static String TABLE_NAME = "wikipedia.accumulo.table";
+  
+  public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
+  
+  public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
+  public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
+  public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
+  
+  public final static String ANALYZER = "wikipedia.index.analyzer";
+  
+  public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
+  
+  public static String getUser(Configuration conf) {
+    return conf.get(USER);
+  };
+  
+  public static byte[] getPassword(Configuration conf) {
+    String pass = conf.get(PASSWORD);
+    if (pass == null) {
+      return null;
+    }
+    return pass.getBytes();
+  }
+  
+  public static String getTableName(Configuration conf) {
+    String tablename = conf.get(TABLE_NAME);
+    if (tablename == null) {
+      throw new RuntimeException("No data table name specified in " + TABLE_NAME);
+    }
+    return tablename;
+  }
+  
+  public static String getInstanceName(Configuration conf) {
+    return conf.get(INSTANCE_NAME);
+  }
+  
+  public static String getZookeepers(Configuration conf) {
+    String zookeepers = conf.get(ZOOKEEPERS);
+    if (zookeepers == null) {
+      throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
+    }
+    return zookeepers;
+  }
+  
+  public static Path getNamespacesFile(Configuration conf) {
+    String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
+    return new Path(filename);
+  }
+  
+  public static Path getLanguagesFile(Configuration conf) {
+    String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
+    return new Path(filename);
+  }
+  
+  public static Path getWorkingDirectory(Configuration conf) {
+    String filename = conf.get(WORKING_DIRECTORY);
+    return new Path(filename);
+  }
+  
+  public static Analyzer getAnalyzer(Configuration conf) throws IOException {
+    Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
+    return ReflectionUtils.newInstance(analyzerClass, conf);
+  }
+  
+  public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
+    return getInstance(conf).getConnector(getUser(conf), getPassword(conf));
+  }
+  
+  public static Instance getInstance(Configuration conf) {
+    return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
+  }
+  
+  public static int getNumPartitions(Configuration conf) {
+    return conf.getInt(NUM_PARTITIONS, 25);
+  }
+  
+  /**
+   * Helper method to get properties from Hadoop configuration
+   * 
+   * @param <T>
+   * @param conf
+   * @param propertyName
+   * @param resultClass
+   * @throws IllegalArgumentException
+   *           if property is not defined, null, or empty. Or if resultClass is not handled.
+   * @return value of property
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
+    String p = conf.get(propertyName);
+    if (StringUtils.isEmpty(p))
+      throw new IllegalArgumentException(propertyName + " must be specified");
+    
+    if (resultClass.equals(String.class))
+      return (T) p;
+    else if (resultClass.equals(String[].class))
+      return (T) conf.getStrings(propertyName);
+    else if (resultClass.equals(Boolean.class))
+      return (T) Boolean.valueOf(p);
+    else if (resultClass.equals(Long.class))
+      return (T) Long.valueOf(p);
+    else if (resultClass.equals(Integer.class))
+      return (T) Integer.valueOf(p);
+    else if (resultClass.equals(Float.class))
+      return (T) Float.valueOf(p);
+    else if (resultClass.equals(Double.class))
+      return (T) Double.valueOf(p);
+    else
+      throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
+    
+  }
+  
 }

Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java Tue Dec 27 18:19:43 2011
@@ -26,6 +26,18 @@ import java.util.TreeSet;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.aggregation.NumSummation;
+import org.apache.accumulo.core.iterators.aggregation.conf.AggregatorConfiguration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -40,19 +52,8 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 import reader.AggregatingRecordReader;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.aggregation.NumSummation;
-import org.apache.accumulo.core.iterators.aggregation.conf.AggregatorConfiguration;
 
+@SuppressWarnings("deprecation")
 public class WikipediaIngester extends Configured implements Tool {
 	
 	public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";

Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java Tue Dec 27 18:19:43 2011
@@ -21,14 +21,17 @@ import java.util.List;
 import java.util.UUID;
 
 import junit.framework.TestCase;
-import protobuf.Uid;
-import protobuf.Uid.List.Builder;
+
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.aggregation.Aggregator;
 
+import protobuf.Uid;
+import protobuf.Uid.List.Builder;
+
+@SuppressWarnings("deprecation")
 public class GlobalIndexUidAggregatorTest extends TestCase {
 	
-	Aggregator agg = new GlobalIndexUidAggregator();
+  Aggregator agg = new GlobalIndexUidAggregator();
 	
 
 	private Uid.List.Builder createNewUidList() {

Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/pom.xml?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/pom.xml (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/pom.xml Tue Dec 27 18:19:43 2011
@@ -171,6 +171,7 @@
     <version.googlecollections>1.0</version.googlecollections>
     <version.libthrift>0.6.1</version.libthrift>
     <version.zookeeper>3.3.1</version.zookeeper>
+    <version.minlog>1.2</version.minlog>
   </properties>
 
   <dependencyManagement>
@@ -212,6 +213,11 @@
       </dependency>
       <dependency>
         <groupId>com.googlecode</groupId>
+        <artifactId>minlog</artifactId>
+        <version>${version.minlog}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.googlecode</groupId>
         <artifactId>kryo</artifactId>
         <version>${version.kryo}</version>
       </dependency>

Modified: incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java?rev=1224966&r1=1224965&r2=1224966&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java (original)
+++ incubator/accumulo/branches/1.4/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java Tue Dec 27 18:19:43 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package iterator;
 
 import java.io.IOException;
@@ -25,11 +25,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.jexl2.parser.ParseException;
-import org.apache.log4j.Logger;
-
-import parser.EventFields;
-import parser.QueryEvaluator;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
@@ -38,291 +33,291 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.OptionDescriber;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.commons.jexl2.parser.ParseException;
+import org.apache.log4j.Logger;
+
+import parser.EventFields;
+import parser.QueryEvaluator;
 
 import com.esotericsoftware.kryo.Kryo;
 
 /**
  * 
- * This iterator aggregates rows together using the specified key comparator. Subclasses will
- * provide their own implementation of fillMap which will fill the supplied EventFields object
- * with field names (key) and field values (value). After all fields have been put into the 
- * aggregated object (by aggregating all columns with the same key), the EventFields object will be compared
- * against the supplied expression. If the expression returns true, then the return key and
- * return value can be retrieved via getTopKey() and getTopValue(). 
+ * This iterator aggregates rows together using the specified key comparator. Subclasses will provide their own implementation of fillMap which will fill the
+ * supplied EventFields object with field names (key) and field values (value). After all fields have been put into the aggregated object (by aggregating all
+ * columns with the same key), the EventFields object will be compared against the supplied expression. If the expression returns true, then the return key and
+ * return value can be retrieved via getTopKey() and getTopValue().
  * 
- * Optionally, the caller can set an expression (field operator value) that should not be evaluated against
- * the event. For example, if the query is "A == 'foo' and B == 'bar'", but for some reason B may not be in
- * the data, then setting the UNEVALUATED_EXPRESSIONS option to "B == 'bar'" will allow the events to be
- * evaluated against the remainder of the expression and still return as true.
+ * Optionally, the caller can set an expression (field operator value) that should not be evaluated against the event. For example, if the query is
+ * "A == 'foo' and B == 'bar'", but for some reason B may not be in the data, then setting the UNEVALUATED_EXPRESSIONS option to "B == 'bar'" will allow the
+ * events to be evaluated against the remainder of the expression and still return as true.
  * 
- * By default this iterator will return all Events in the shard. If the START_DATE and
- * END_DATE are specified, then this iterator will evaluate the timestamp of the key against
- * the start and end dates. If the event date is not within the range of start to end, then it is skipped.
+ * By default this iterator will return all Events in the shard. If the START_DATE and END_DATE are specified, then this iterator will evaluate the timestamp of
+ * the key against the start and end dates. If the event date is not within the range of start to end, then it is skipped.
  * 
- * This iterator will return up the stack an EventFields object serialized using Kryo in the cell Value. 
+ * This iterator will return up the stack an EventFields object serialized using Kryo in the cell Value.
  * 
  */
-public abstract class AbstractEvaluatingIterator implements SortedKeyValueIterator<Key, Value>, OptionDescriber {
-
-    private static Logger log = Logger.getLogger(AbstractEvaluatingIterator.class);
-    protected static final byte[] NULL_BYTE = new byte[0];
-    public static final String QUERY_OPTION = "expr";
-    public static final String UNEVALUTED_EXPRESSIONS = "unevaluated.expressions";
-    
-    private PartialKey comparator = null;
-    private SortedKeyValueIterator<Key, Value> iterator;
-    private Key currentKey = new Key();
-    private Key returnKey;
-    private Value returnValue;
-    private String expression;
-    private QueryEvaluator evaluator;
-    private EventFields event = null;
-    private static Kryo kryo = new Kryo();
-    private Range seekRange = null;
-    private Set<String> skipExpressions = null;
-
-    protected AbstractEvaluatingIterator(AbstractEvaluatingIterator other, IteratorEnvironment env) {
-        iterator = other.iterator.deepCopy(env);
-        event = other.event;
-    }
-
-    public AbstractEvaluatingIterator() {
-    }
-
-    /**
-     * Implementations will return the PartialKey value to use for comparing keys for aggregating events
-     *
-     * @return the type of comparator to use
-     */
-    public abstract PartialKey getKeyComparator();
-
-    /**
-     * When the query expression evaluates to true against the event, the event fields
-     * will be serialized into the Value and returned up the iterator stack. Implemenations
-     * will need to provide a key to be used with the event.
-     *
-     * @param k
-     * @return the key that should be returned with the map of values.
-     */
-    public abstract Key getReturnKey(Key k) throws Exception;
-
-    /**
-     * Implementations will need to fill the map with field visibilities, names, and
-     * values. When all fields have been aggregated the event will be evaluated against
-     * the query expression.
-     *
-     * @param event Multimap of event names and fields.
-     * @param key current Key
-     * @param value current Value
-     */
-    public abstract void fillMap(EventFields event, Key key, Value value) throws Exception;
-
-    /**
-     * Check to see if this key should be acted upon. Provides the ability to skip this key
-     * and all of the following ones that match using the comparator.
-     *
-     * @param key
-     * @return
-     */
-    public abstract boolean isKeyAccepted(Key key);
-
-    /**
-     * Reset state.
-     */
-    public void reset() {
-        event.clear();
+public abstract class AbstractEvaluatingIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+  
+  private static Logger log = Logger.getLogger(AbstractEvaluatingIterator.class);
+  protected static final byte[] NULL_BYTE = new byte[0];
+  public static final String QUERY_OPTION = "expr";
+  public static final String UNEVALUTED_EXPRESSIONS = "unevaluated.expressions";
+  
+  private PartialKey comparator = null;
+  protected SortedKeyValueIterator<Key,Value> iterator;
+  private Key currentKey = new Key();
+  private Key returnKey;
+  private Value returnValue;
+  private String expression;
+  private QueryEvaluator evaluator;
+  private EventFields event = null;
+  private static Kryo kryo = new Kryo();
+  private Range seekRange = null;
+  private Set<String> skipExpressions = null;
+  
+  protected AbstractEvaluatingIterator(AbstractEvaluatingIterator other, IteratorEnvironment env) {
+    iterator = other.iterator.deepCopy(env);
+    event = other.event;
+  }
+  
+  public AbstractEvaluatingIterator() {}
+  
+  /**
+   * Implementations will return the PartialKey value to use for comparing keys for aggregating events
+   * 
+   * @return the type of comparator to use
+   */
+  public abstract PartialKey getKeyComparator();
+  
+  /**
+   * When the query expression evaluates to true against the event, the event fields will be serialized into the Value and returned up the iterator stack.
+   * Implemenations will need to provide a key to be used with the event.
+   * 
+   * @param k
+   * @return the key that should be returned with the map of values.
+   */
+  public abstract Key getReturnKey(Key k) throws Exception;
+  
+  /**
+   * Implementations will need to fill the map with field visibilities, names, and values. When all fields have been aggregated the event will be evaluated
+   * against the query expression.
+   * 
+   * @param event
+   *          Multimap of event names and fields.
+   * @param key
+   *          current Key
+   * @param value
+   *          current Value
+   */
+  public abstract void fillMap(EventFields event, Key key, Value value) throws Exception;
+  
+  /**
+   * Check to see if this key should be acted upon. Provides the ability to skip this key and all of the following ones that match using the comparator.
+   * 
+   * @param key
+   * @return
+   * @throws IOException
+   */
+  public abstract boolean isKeyAccepted(Key key) throws IOException;
+  
+  /**
+   * Reset state.
+   */
+  public void reset() {
+    event.clear();
+  }
+  
+  private void aggregateRowColumn(EventFields event) throws IOException {
+    
+    currentKey.set(iterator.getTopKey());
+    
+    try {
+      fillMap(event, iterator.getTopKey(), iterator.getTopValue());
+      iterator.next();
+      
+      while (iterator.hasTop() && iterator.getTopKey().equals(currentKey, this.comparator)) {
+        fillMap(event, iterator.getTopKey(), iterator.getTopValue());
+        iterator.next();
+      }
+      
+      // Get the return key
+      returnKey = getReturnKey(currentKey);
+    } catch (Exception e) {
+      throw new IOException("Error aggregating event", e);
     }
-
-    private void aggregateRowColumn(EventFields event) throws IOException {
-
-        currentKey.set(iterator.getTopKey());
-
-        try {
-            fillMap(event, iterator.getTopKey(), iterator.getTopValue());
-            iterator.next();
-
-            while (iterator.hasTop() && iterator.getTopKey().equals(currentKey, this.comparator)) {
-                fillMap(event, iterator.getTopKey(), iterator.getTopValue());
-                iterator.next();
-            }
-
-            //Get the return key
-            returnKey = getReturnKey(currentKey);
-        } catch (Exception e) {
-            throw new IOException("Error aggregating event", e);
+    
+  }
+  
+  private void findTop() throws IOException {
+    do {
+      reset();
+      // check if aggregation is needed
+      if (iterator.hasTop()) {
+        // Check to see if the current key is accepted. For example in the wiki
+        // table there are field index rows. We don't want to process those in
+        // some cases so return right away. Consume all of the non-accepted keys
+        while (iterator.hasTop() && !isKeyAccepted(iterator.getTopKey())) {
+          iterator.next();
         }
-
-    }
-
-    private void findTop() throws IOException {
-        do {
-            reset();
-            //check if aggregation is needed
-            if (iterator.hasTop()) {
-                //Check to see if the current key is accepted. For example in the wiki
-                //table there are field index rows. We don't want to process those in
-                //some cases so return right away. Consume all of the non-accepted keys
-                while (iterator.hasTop() && !isKeyAccepted(iterator.getTopKey())) {
-                    iterator.next();
-                }
-
-                if (iterator.hasTop()) {
-                    aggregateRowColumn(event);
-                    
-                    //Evaluate the event against the expression
-                    if (event.size() > 0 && this.evaluator.evaluate(event)) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Event evaluated to true, key = " + returnKey);
-                        }
-                        //Create a byte array
-                        byte[] serializedMap = new byte[event.getByteSize() + (event.size() * 20)];
-                        //Wrap in ByteBuffer to work with Kryo
-                        ByteBuffer buf = ByteBuffer.wrap(serializedMap);
-                        //Serialize the EventFields object
-                        event.writeObjectData(kryo, buf);
-                        //Truncate array to the used size.
-                        returnValue = new Value(Arrays.copyOfRange(serializedMap, 0, buf.position()));
-                    } else {
-                        returnKey = null;
-                        returnValue = null;
-                    }
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Iterator no longer has top.");
-                    }
-                }
-            } else {
-                log.info("Iterator.hasTop() == false");
+        
+        if (iterator.hasTop()) {
+          aggregateRowColumn(event);
+          
+          // Evaluate the event against the expression
+          if (event.size() > 0 && this.evaluator.evaluate(event)) {
+            if (log.isDebugEnabled()) {
+              log.debug("Event evaluated to true, key = " + returnKey);
             }
-        } while (returnValue == null && iterator.hasTop());
-
-        //Sanity check. Make sure both returnValue and returnKey are null or both are not null
-        if (!((returnKey == null && returnValue == null) || (returnKey != null && returnValue != null))) {
-            log.warn("Key: " + ((returnKey == null) ? "null" : returnKey.toString()));
-            log.warn("Value: " + ((returnValue == null) ? "null" : returnValue.toString()));
-            throw new IOException("Return values are inconsistent");
-        }
-    }
-
-    public Key getTopKey() {
-        if (returnKey != null) {
-            return returnKey;
-        }
-        return iterator.getTopKey();
-    }
-
-    public Value getTopValue() {
-        if (returnValue != null) {
-            return returnValue;
-        }
-        return iterator.getTopValue();
-    }
-
-    public boolean hasTop() {
-        return returnKey != null || iterator.hasTop();
-    }
-
-    public void next() throws IOException {
-        if (returnKey != null) {
+            // Create a byte array
+            byte[] serializedMap = new byte[event.getByteSize() + (event.size() * 20)];
+            // Wrap in ByteBuffer to work with Kryo
+            ByteBuffer buf = ByteBuffer.wrap(serializedMap);
+            // Serialize the EventFields object
+            event.writeObjectData(kryo, buf);
+            // Truncate array to the used size.
+            returnValue = new Value(Arrays.copyOfRange(serializedMap, 0, buf.position()));
+          } else {
             returnKey = null;
             returnValue = null;
-        } else if (iterator.hasTop()) {
-            iterator.next();
-        }
-
-        findTop();
-    }
-
-    /**
-     * Copy of IteratorUtil.maximizeStartKeyTimeStamp due to IllegalAccessError
-     * @param range
-     * @return
-     */
-    static Range maximizeStartKeyTimeStamp(Range range) {
-        Range seekRange = range;
-
-        if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
-            Key seekKey = new Key(seekRange.getStartKey());
-            seekKey.setTimestamp(Long.MAX_VALUE);
-            seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
-        }
-
-        return seekRange;
-    }
-
-    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-        //do not want to seek to the middle of a value that should be
-        //aggregated...
-
-        seekRange = maximizeStartKeyTimeStamp(range);
-
-        iterator.seek(seekRange, columnFamilies, inclusive);
-        findTop();
-
-        if (range.getStartKey() != null) {
-            while (hasTop()
-                    && getTopKey().equals(range.getStartKey(), this.comparator)
-                    && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
-                //the value has a more recent time stamp, so
-                //pass it up
-                //log.debug("skipping "+getTopKey());
-                next();
-            }
-
-            while (hasTop() && range.beforeStartKey(getTopKey())) {
-                next();
-            }
+          }
+        } else {
+          if (log.isDebugEnabled()) {
+            log.debug("Iterator no longer has top.");
+          }
         }
-
+      } else {
+        log.debug("Iterator.hasTop() == false");
+      }
+    } while (returnValue == null && iterator.hasTop());
+    
+    // Sanity check. Make sure both returnValue and returnKey are null or both are not null
+    if (!((returnKey == null && returnValue == null) || (returnKey != null && returnValue != null))) {
+      log.warn("Key: " + ((returnKey == null) ? "null" : returnKey.toString()));
+      log.warn("Value: " + ((returnValue == null) ? "null" : returnValue.toString()));
+      throw new IOException("Return values are inconsistent");
+    }
+  }
+  
+  public Key getTopKey() {
+    if (returnKey != null) {
+      return returnKey;
+    }
+    return iterator.getTopKey();
+  }
+  
+  public Value getTopValue() {
+    if (returnValue != null) {
+      return returnValue;
+    }
+    return iterator.getTopValue();
+  }
+  
+  public boolean hasTop() {
+    return returnKey != null || iterator.hasTop();
+  }
+  
+  public void next() throws IOException {
+    if (returnKey != null) {
+      returnKey = null;
+      returnValue = null;
+    } else if (iterator.hasTop()) {
+      iterator.next();
     }
-
-    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
-        validateOptions(options);
-        event = new EventFields();
-        this.comparator = getKeyComparator();
-        this.iterator = source;
-        try {
-        	//Replace any expressions that we should not evaluate.
-        	if (null != this.skipExpressions && this.skipExpressions.size() != 0) {
-        		for (String skip : this.skipExpressions) {
-        			//Expression should have form: field<sp>operator<sp>literal.
-        			//We are going to replace the expression with field == null.
-        			String field = skip.substring(0, skip.indexOf(" ") -1);
-        			this.expression = this.expression.replaceAll(skip, field+" == null");
-        		}
-        	}
-            this.evaluator = new QueryEvaluator(this.expression);
-        } catch (ParseException e) {
-            throw new IllegalArgumentException("Failed to parse query", e);
-        }
-        EventFields.initializeKryo(kryo);
+    
+    findTop();
+  }
+  
+  /**
+   * Copy of IteratorUtil.maximizeStartKeyTimeStamp due to IllegalAccessError
+   * 
+   * @param range
+   * @return
+   */
+  static Range maximizeStartKeyTimeStamp(Range range) {
+    Range seekRange = range;
+    
+    if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
+      Key seekKey = new Key(seekRange.getStartKey());
+      seekKey.setTimestamp(Long.MAX_VALUE);
+      seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
     }
-
-    public IteratorOptions describeOptions() {
-    	Map<String,String> options = new HashMap<String,String>();
-    	options.put(QUERY_OPTION, "query expression");
-    	options.put(UNEVALUTED_EXPRESSIONS, "comma separated list of expressions to skip");
-        return new IteratorOptions(getClass().getSimpleName(), "evaluates event objects against an expression", options, null);
+    
+    return seekRange;
+  }
+  
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    // do not want to seek to the middle of a value that should be
+    // aggregated...
+    
+    seekRange = maximizeStartKeyTimeStamp(range);
+    
+    iterator.seek(seekRange, columnFamilies, inclusive);
+    findTop();
+    
+    if (range.getStartKey() != null) {
+      while (hasTop() && getTopKey().equals(range.getStartKey(), this.comparator) && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
+        // the value has a more recent time stamp, so
+        // pass it up
+        // log.debug("skipping "+getTopKey());
+        next();
+      }
+      
+      while (hasTop() && range.beforeStartKey(getTopKey())) {
+        next();
+      }
     }
-
-    public boolean validateOptions(Map<String, String> options) {
-        if (!options.containsKey(QUERY_OPTION))
-            return false;
-        else 
-        	this.expression = options.get(QUERY_OPTION);
-        
-        if (options.containsKey(UNEVALUTED_EXPRESSIONS)) {
-        	String expressionList = options.get(UNEVALUTED_EXPRESSIONS);
-        	if (expressionList != null && !expressionList.trim().equals("")) {
-	        	this.skipExpressions = new HashSet<String>();
-	        	for (String e : expressionList.split(","))
-	        		this.skipExpressions.add(e);
-        	}
+    
+  }
+  
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    validateOptions(options);
+    event = new EventFields();
+    this.comparator = getKeyComparator();
+    this.iterator = source;
+    try {
+      // Replace any expressions that we should not evaluate.
+      if (null != this.skipExpressions && this.skipExpressions.size() != 0) {
+        for (String skip : this.skipExpressions) {
+          // Expression should have form: field<sp>operator<sp>literal.
+          // We are going to replace the expression with field == null.
+          String field = skip.substring(0, skip.indexOf(" ") - 1);
+          this.expression = this.expression.replaceAll(skip, field + " == null");
         }
-        return true;
-    }
-
-    public String getQueryExpression() {
-        return this.expression;
-    }
+      }
+      this.evaluator = new QueryEvaluator(this.expression);
+    } catch (ParseException e) {
+      throw new IllegalArgumentException("Failed to parse query", e);
+    }
+    EventFields.initializeKryo(kryo);
+  }
+  
+  public IteratorOptions describeOptions() {
+    Map<String,String> options = new HashMap<String,String>();
+    options.put(QUERY_OPTION, "query expression");
+    options.put(UNEVALUTED_EXPRESSIONS, "comma separated list of expressions to skip");
+    return new IteratorOptions(getClass().getSimpleName(), "evaluates event objects against an expression", options, null);
+  }
+  
+  public boolean validateOptions(Map<String,String> options) {
+    if (!options.containsKey(QUERY_OPTION))
+      return false;
+    else
+      this.expression = options.get(QUERY_OPTION);
+    
+    if (options.containsKey(UNEVALUTED_EXPRESSIONS)) {
+      String expressionList = options.get(UNEVALUTED_EXPRESSIONS);
+      if (expressionList != null && !expressionList.trim().equals("")) {
+        this.skipExpressions = new HashSet<String>();
+        for (String e : expressionList.split(","))
+          this.skipExpressions.add(e);
+      }
+    }
+    return true;
+  }
+  
+  public String getQueryExpression() {
+    return this.expression;
+  }
 }