You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2011/12/05 21:05:51 UTC

svn commit: r1210600 [1/16] - in /incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/ ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/ ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/ ingest/src/main/java/protobuf/ ...

Author: billie
Date: Mon Dec  5 20:05:49 2011
New Revision: 1210600

URL: http://svn.apache.org/viewvc?rev=1210600&view=rev
Log:
ACCUMULO-41 formatted java and pom files

Modified:
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/iterator/TotalAggregatingIterator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/LcNoDiacriticsNormalizer.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/NoOpNormalizer.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/Normalizer.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/NumberNormalizer.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/protobuf/TermWeight.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/protobuf/Uid.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/AggregatingRecordReader.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/LfLineReader.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/LongLineRecordReader.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/util/TextUtil.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/TextIndexAggregatorTest.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/StandaloneStatusReporter.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/WikipediaMapperTest.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/normalizer/testNumberNormalizer.java
    incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/reader/AggregatingRecordReaderTest.java
    incubator/accumulo/trunk/contrib/accumulo_sample/pom.xml
    incubator/accumulo/trunk/contrib/accumulo_sample/query-war/pom.xml
    incubator/accumulo/trunk/contrib/accumulo_sample/query/pom.xml
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/function/QueryFunctions.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/BooleanLogicIterator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/BooleanLogicTreeNode.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/DefaultIteratorEnvironment.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/EvaluatingIterator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/FieldIndexIterator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/OptimizedQueryIterator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/UniqFieldNameValueIterator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/jexl/Arithmetic.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/ContentLogic.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/FieldIndexQueryReWriter.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/JexlOperatorConstants.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/QueryEvaluator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/QueryParser.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/RangeCalculator.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/TreeBuilder.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/TreeNode.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/Document.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/Field.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/Results.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/query/IQuery.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/query/Query.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java
    incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml Mon Dec  5 20:05:49 2011
@@ -1,20 +1,20 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
+  <!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements. See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+  -->
 
   <modelVersion>4.0.0</modelVersion>
   <parent>

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package aggregator;
 
 import java.util.HashSet;
@@ -27,62 +27,61 @@ import org.apache.accumulo.core.iterator
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization
- * for the global index and global reverse index, where the list of UIDs for events will be maintained in the
- * index for low cardinality terms (Low in this case being less than 20). 
- *
+ * Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization for the global index and global reverse index, where
+ * the list of UIDs for events will be maintained in the index for low cardinality terms (Low in this case being less than 20).
+ * 
  */
 public class GlobalIndexUidAggregator implements Aggregator {
-
-	private static final Logger log = Logger.getLogger(GlobalIndexUidAggregator.class);
-	private Uid.List.Builder builder = Uid.List.newBuilder();
-	//Using a set instead of a list so that duplicate IDs are filtered out of the list.
-	private HashSet<String> uids = new HashSet<String>();
-	private boolean seenIgnore = false;
-	public static final int MAX = 20;
-	private long count = 0;
-	
-	@Override
-	public Value aggregate() {
-		//Special case logic
-		//If we have aggregated more than MAX UIDs, then null out the UID list and set IGNORE to true
-		//However, always maintain the count
-		if (uids.size() > MAX || seenIgnore) {
-			builder.setCOUNT(count);
-			builder.setIGNORE(true);
-			builder.clearUID();
-		} else {
-			builder.setCOUNT(count);
-			builder.setIGNORE(false);
-			builder.addAllUID(uids);
-		}
-		return new Value(builder.build().toByteArray());
-	}
-
-	@Override
-	public void collect(Value value) {
-		if (null == value || value.get().length == 0)
-			return;
-		//Collect the values, which are serialized Uid.List objects
-		try {
-			Uid.List v = Uid.List.parseFrom(value.get());
-			count = count + v.getCOUNT();
-			if (v.getIGNORE()) {
-				seenIgnore = true;
-			}
-			//Add the incoming list to this list
-			uids.addAll(v.getUIDList());
-		} catch (InvalidProtocolBufferException e) {
-			log.error("Value passed to aggregator was not of type Uid.List", e);
-		}
-	}
-
-	@Override
-	public void reset() {
-		count = 0;
-		seenIgnore = false;
-		builder = Uid.List.newBuilder();
-		uids.clear();
-	}
-
+  
+  private static final Logger log = Logger.getLogger(GlobalIndexUidAggregator.class);
+  private Uid.List.Builder builder = Uid.List.newBuilder();
+  // Using a set instead of a list so that duplicate IDs are filtered out of the list.
+  private HashSet<String> uids = new HashSet<String>();
+  private boolean seenIgnore = false;
+  public static final int MAX = 20;
+  private long count = 0;
+  
+  @Override
+  public Value aggregate() {
+    // Special case logic
+    // If we have aggregated more than MAX UIDs, then null out the UID list and set IGNORE to true
+    // However, always maintain the count
+    if (uids.size() > MAX || seenIgnore) {
+      builder.setCOUNT(count);
+      builder.setIGNORE(true);
+      builder.clearUID();
+    } else {
+      builder.setCOUNT(count);
+      builder.setIGNORE(false);
+      builder.addAllUID(uids);
+    }
+    return new Value(builder.build().toByteArray());
+  }
+  
+  @Override
+  public void collect(Value value) {
+    if (null == value || value.get().length == 0)
+      return;
+    // Collect the values, which are serialized Uid.List objects
+    try {
+      Uid.List v = Uid.List.parseFrom(value.get());
+      count = count + v.getCOUNT();
+      if (v.getIGNORE()) {
+        seenIgnore = true;
+      }
+      // Add the incoming list to this list
+      uids.addAll(v.getUIDList());
+    } catch (InvalidProtocolBufferException e) {
+      log.error("Value passed to aggregator was not of type Uid.List", e);
+    }
+  }
+  
+  @Override
+  public void reset() {
+    count = 0;
+    seenIgnore = false;
+    builder = Uid.List.newBuilder();
+    uids.clear();
+  }
+  
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package aggregator;
 
 import java.util.ArrayList;
@@ -30,65 +30,65 @@ import com.google.protobuf.InvalidProtoc
 
 /**
  * An Aggregator to merge together a list of term offsets and one normalized term frequency
- *
+ * 
  */
 public class TextIndexAggregator implements Aggregator {
-    private static final Logger log = Logger.getLogger(TextIndexAggregator.class);
+  private static final Logger log = Logger.getLogger(TextIndexAggregator.class);
+  
+  private List<Integer> offsets = new ArrayList<Integer>();
+  private TermWeight.Info.Builder builder = TermWeight.Info.newBuilder();
+  private float normalizedTermFrequency = 0f;
+  
+  @Override
+  public Value aggregate() {
+    // Keep the sorted order we tried to maintain
+    for (int i = 0; i < offsets.size(); ++i) {
+      builder.addWordOffset(offsets.get(i));
+    }
     
-    private List<Integer> offsets = new ArrayList<Integer>();
-    private TermWeight.Info.Builder builder = TermWeight.Info.newBuilder();
-    private float normalizedTermFrequency = 0f;
-
-    @Override
-    public Value aggregate() {
-        // Keep the sorted order we tried to maintain
-        for (int i = 0; i < offsets.size(); ++i) {
-            builder.addWordOffset(offsets.get(i));
-        }
-        
-        builder.setNormalizedTermFrequency(normalizedTermFrequency);
-        
-        return new Value(builder.build().toByteArray());
+    builder.setNormalizedTermFrequency(normalizedTermFrequency);
+    
+    return new Value(builder.build().toByteArray());
+  }
+  
+  @Override
+  public void collect(Value value) {
+    // Make sure we don't aggregate something else
+    if (value == null || value.get().length == 0) {
+      return;
     }
-
-    @Override
-    public void collect(Value value) {
-        // Make sure we don't aggregate something else
-        if (value == null || value.get().length == 0) {
-            return;
-        }
-        
-        TermWeight.Info info;
-        
-        try {
-            info = TermWeight.Info.parseFrom(value.get());
-        } catch (InvalidProtocolBufferException e) {
-            log.error("Value passed to aggregator was not of type TermWeight.Info", e);
-            return;
-        }
-
-        // Add each offset into the list maintaining sorted order
-        for (int offset : info.getWordOffsetList()) {
-            int pos = Collections.binarySearch(offsets, offset);
-            
-            if (pos < 0) {
-                // Undo the transform on the insertion point
-                offsets.add((-1 * pos) - 1, offset); 
-            } else {
-                offsets.add(pos, offset);
-            }
-        }
-        
-        if (info.getNormalizedTermFrequency() > 0) {
-            this.normalizedTermFrequency += info.getNormalizedTermFrequency();
-        }
+    
+    TermWeight.Info info;
+    
+    try {
+      info = TermWeight.Info.parseFrom(value.get());
+    } catch (InvalidProtocolBufferException e) {
+      log.error("Value passed to aggregator was not of type TermWeight.Info", e);
+      return;
     }
-
-    @Override
-    public void reset() {
-        this.offsets.clear();
-        this.normalizedTermFrequency = 0f;
-        this.builder = TermWeight.Info.newBuilder();
+    
+    // Add each offset into the list maintaining sorted order
+    for (int offset : info.getWordOffsetList()) {
+      int pos = Collections.binarySearch(offsets, offset);
+      
+      if (pos < 0) {
+        // Undo the transform on the insertion point
+        offsets.add((-1 * pos) - 1, offset);
+      } else {
+        offsets.add(pos, offset);
+      }
     }
-
+    
+    if (info.getNormalizedTermFrequency() > 0) {
+      this.normalizedTermFrequency += info.getNormalizedTermFrequency();
+    }
+  }
+  
+  @Override
+  public void reset() {
+    this.offsets.clear();
+    this.normalizedTermFrequency = 0f;
+    this.builder = TermWeight.Info.newBuilder();
+  }
+  
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package ingest;
 
 import java.io.Reader;
@@ -31,138 +31,142 @@ import normalizer.LcNoDiacriticsNormaliz
 import normalizer.NumberNormalizer;
 
 public class ArticleExtractor {
-	
-	public final static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'Z");
-	private static NumberNormalizer nn = new NumberNormalizer();
-	private static LcNoDiacriticsNormalizer lcdn = new LcNoDiacriticsNormalizer();
-	
-	public static class Article {
-		int id;
-		String title;
-		long timestamp;
-		String comments;
-		String text;
-		
-		private Article(int id, String title, long timestamp, String comments, String text) {
-			super();
-			this.id = id;
-			this.title = title;
-			this.timestamp = timestamp;
-			this.comments = comments;
-			this.text = text;
-		}
-		public int getId() {
-			return id;
-		}
-		public String getTitle() {
-			return title;
-		}
-		public String getComments() {
-			return comments;
-		}
-		public String getText() {
-			return text;
-		}
-		public long getTimestamp() {
-			return timestamp;
-		}
-		
-		public Map<String,Object> getFieldValues() {
-			Map<String,Object> fields = new HashMap<String,Object>();
-			fields.put("ID", this.id);
-			fields.put("TITLE", this.title);
-			fields.put("TIMESTAMP", this.timestamp);
-			fields.put("COMMENTS", this.comments);
-			return fields;
-		}
-		
-		public Map<String,String> getNormalizedFieldValues() {
-			Map<String,String> fields = new HashMap<String,String>();
-			fields.put("ID", nn.normalizeFieldValue("ID", this.id));
-			fields.put("TITLE", lcdn.normalizeFieldValue("TITLE", this.title));
-			fields.put("TIMESTAMP", nn.normalizeFieldValue("TIMESTAMP", this.timestamp));
-			fields.put("COMMENTS", lcdn.normalizeFieldValue("COMMENTS", this.comments));
-			return fields;
-		}
-		
-	}
-	
-	public ArticleExtractor() {
-	}
-
-	public Article extract(Reader reader)  {
-		XMLInputFactory xmlif = XMLInputFactory.newInstance();
-		xmlif.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.TRUE);
-
-		XMLStreamReader xmlr = null;
-
-		try {
-			xmlr = xmlif.createXMLStreamReader(reader);
-		} catch (XMLStreamException e1) {
-			throw new RuntimeException(e1);
-		}
-
-		QName titleName = QName.valueOf("title");
-		QName textName = QName.valueOf("text");
-		QName revisionName = QName.valueOf("revision");
-		QName timestampName = QName.valueOf("timestamp");
-		QName commentName = QName.valueOf("comment");
-		QName idName = QName.valueOf("id");
-
-		Map<QName, StringBuilder> tags = new HashMap<QName, StringBuilder>();
-		for (QName tag : new QName[] { titleName, textName, timestampName, commentName, idName }) {
-			tags.put(tag, new StringBuilder());
-		}
-
-		StringBuilder articleText = tags.get(textName);
-		StringBuilder titleText = tags.get(titleName);
-		StringBuilder timestampText = tags.get(timestampName);
-		StringBuilder commentText = tags.get(commentName);
-		StringBuilder idText = tags.get(idName);
-
-		StringBuilder current = null;
-		boolean inRevision = false;
-		while (true) {
-			try {
-				if (!xmlr.hasNext())
-					break;
-				xmlr.next();
-			} catch (XMLStreamException e) {
-				throw new RuntimeException(e);
-			}
-			QName currentName = null;
-			if (xmlr.hasName()) {
-				currentName = xmlr.getName();
-			}
-			if (xmlr.isStartElement() && tags.containsKey(currentName)) {
-				if (!inRevision || (!currentName.equals(revisionName) && !currentName.equals(idName))) {
-					current = tags.get(currentName);
-					current.setLength(0);
-				}
-			} else if (xmlr.isStartElement() && currentName.equals(revisionName)) {
-				inRevision = true;
-			} else if (xmlr.isEndElement() && currentName.equals(revisionName)) {
-				inRevision = false;
-			} else if (xmlr.isEndElement() && current != null) {
-				if (textName.equals(currentName)) {
-					
-					String title = titleText.toString();
-					String text = articleText.toString();
-					String comment = commentText.toString();
-					int id = Integer.parseInt(idText.toString());
-					long timestamp;
-					try {
-						timestamp = dateFormat.parse(timestampText.append("+0000").toString()).getTime();
-						return new Article(id, title, timestamp, comment, text);
-					} catch (ParseException e) {
-						return null;
-					}
-				}
-				current = null;
-			} else if (current != null && xmlr.hasText()) {
-				current.append(xmlr.getText());
-			}
-		}
-		return null;
-	}
+  
+  public final static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'Z");
+  private static NumberNormalizer nn = new NumberNormalizer();
+  private static LcNoDiacriticsNormalizer lcdn = new LcNoDiacriticsNormalizer();
+  
+  public static class Article {
+    int id;
+    String title;
+    long timestamp;
+    String comments;
+    String text;
+    
+    private Article(int id, String title, long timestamp, String comments, String text) {
+      super();
+      this.id = id;
+      this.title = title;
+      this.timestamp = timestamp;
+      this.comments = comments;
+      this.text = text;
+    }
+    
+    public int getId() {
+      return id;
+    }
+    
+    public String getTitle() {
+      return title;
+    }
+    
+    public String getComments() {
+      return comments;
+    }
+    
+    public String getText() {
+      return text;
+    }
+    
+    public long getTimestamp() {
+      return timestamp;
+    }
+    
+    public Map<String,Object> getFieldValues() {
+      Map<String,Object> fields = new HashMap<String,Object>();
+      fields.put("ID", this.id);
+      fields.put("TITLE", this.title);
+      fields.put("TIMESTAMP", this.timestamp);
+      fields.put("COMMENTS", this.comments);
+      return fields;
+    }
+    
+    public Map<String,String> getNormalizedFieldValues() {
+      Map<String,String> fields = new HashMap<String,String>();
+      fields.put("ID", nn.normalizeFieldValue("ID", this.id));
+      fields.put("TITLE", lcdn.normalizeFieldValue("TITLE", this.title));
+      fields.put("TIMESTAMP", nn.normalizeFieldValue("TIMESTAMP", this.timestamp));
+      fields.put("COMMENTS", lcdn.normalizeFieldValue("COMMENTS", this.comments));
+      return fields;
+    }
+    
+  }
+  
+  public ArticleExtractor() {}
+  
+  public Article extract(Reader reader) {
+    XMLInputFactory xmlif = XMLInputFactory.newInstance();
+    xmlif.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.TRUE);
+    
+    XMLStreamReader xmlr = null;
+    
+    try {
+      xmlr = xmlif.createXMLStreamReader(reader);
+    } catch (XMLStreamException e1) {
+      throw new RuntimeException(e1);
+    }
+    
+    QName titleName = QName.valueOf("title");
+    QName textName = QName.valueOf("text");
+    QName revisionName = QName.valueOf("revision");
+    QName timestampName = QName.valueOf("timestamp");
+    QName commentName = QName.valueOf("comment");
+    QName idName = QName.valueOf("id");
+    
+    Map<QName,StringBuilder> tags = new HashMap<QName,StringBuilder>();
+    for (QName tag : new QName[] {titleName, textName, timestampName, commentName, idName}) {
+      tags.put(tag, new StringBuilder());
+    }
+    
+    StringBuilder articleText = tags.get(textName);
+    StringBuilder titleText = tags.get(titleName);
+    StringBuilder timestampText = tags.get(timestampName);
+    StringBuilder commentText = tags.get(commentName);
+    StringBuilder idText = tags.get(idName);
+    
+    StringBuilder current = null;
+    boolean inRevision = false;
+    while (true) {
+      try {
+        if (!xmlr.hasNext())
+          break;
+        xmlr.next();
+      } catch (XMLStreamException e) {
+        throw new RuntimeException(e);
+      }
+      QName currentName = null;
+      if (xmlr.hasName()) {
+        currentName = xmlr.getName();
+      }
+      if (xmlr.isStartElement() && tags.containsKey(currentName)) {
+        if (!inRevision || (!currentName.equals(revisionName) && !currentName.equals(idName))) {
+          current = tags.get(currentName);
+          current.setLength(0);
+        }
+      } else if (xmlr.isStartElement() && currentName.equals(revisionName)) {
+        inRevision = true;
+      } else if (xmlr.isEndElement() && currentName.equals(revisionName)) {
+        inRevision = false;
+      } else if (xmlr.isEndElement() && current != null) {
+        if (textName.equals(currentName)) {
+          
+          String title = titleText.toString();
+          String text = articleText.toString();
+          String comment = commentText.toString();
+          int id = Integer.parseInt(idText.toString());
+          long timestamp;
+          try {
+            timestamp = dateFormat.parse(timestampText.append("+0000").toString()).getTime();
+            return new Article(id, title, timestamp, comment, text);
+          } catch (ParseException e) {
+            return null;
+          }
+        }
+        current = null;
+      } else if (current != null && xmlr.hasText()) {
+        current.append(xmlr.getText());
+      }
+    }
+    return null;
+  }
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package ingest;
 
 import java.io.IOException;
@@ -31,113 +31,120 @@ import org.apache.accumulo.core.client.C
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 
-
 public class WikipediaConfiguration {
-	public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
-	public final static String USER = "wikipedia.accumulo.user";
-	public final static String PASSWORD = "wikipedia.accumulo.password";
-	public final static String TABLE_NAME = "wikipedia.accumulo.table";
-	
-	public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
-
-	public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
-	public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
-	public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
-		
-	public final static String ANALYZER = "wikipedia.index.analyzer";
-	
-	public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
-	
-	public static String getUser(Configuration conf) { return conf.get(USER); };
-	
-	public static byte[] getPassword(Configuration conf) { 
-		String pass = conf.get(PASSWORD);
-		if (pass == null) {
-			return null;
-		}
-		return pass.getBytes();
-	}
-	
-	public static String getTableName(Configuration conf) {
-		String tablename = conf.get(TABLE_NAME);
-		if (tablename == null) {
-			throw new RuntimeException("No data table name specified in " + TABLE_NAME);
-		}
-		return tablename;
-	}
-	
-	public static String getInstanceName(Configuration conf) { return conf.get(INSTANCE_NAME); }
-	
-	public static String getZookeepers(Configuration conf) {
-		String zookeepers = conf.get(ZOOKEEPERS);
-		if (zookeepers == null) {
-			throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
-		}
-		return zookeepers;
-	}
-		
-	public static Path getNamespacesFile(Configuration conf) {
-		String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
-		return new Path(filename);
-	}
-	public static Path getLanguagesFile(Configuration conf) {
-		String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
-		return new Path(filename);
-	}
-	public static Path getWorkingDirectory(Configuration conf) {
-		String filename = conf.get(WORKING_DIRECTORY);
-		return new Path(filename);
-	}
-	
-	public static Analyzer getAnalyzer(Configuration conf) throws IOException {
-		Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
-		return ReflectionUtils.newInstance(analyzerClass, conf);
-	}
-	
-	public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
-		return new Connector(getInstance(conf), getUser(conf), getPassword(conf));
-	}
-
-	public static Instance getInstance(Configuration conf) {
-		return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
-	}
-	
-	public static int getNumPartitions(Configuration conf) {
-		return conf.getInt(NUM_PARTITIONS, 25);
-	}
-	
-	/**
-	 * Helper method to get properties from Hadoop configuration
-	 * @param <T>
-	 * @param conf
-	 * @param propertyName
-	 * @param resultClass
-	 * @throws IllegalArgumentException if property is not defined, null, or empty. Or if resultClass is not handled.
-	 * @return value of property
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
-		String p = conf.get(propertyName);
-		if (StringUtils.isEmpty(p))
-			throw new IllegalArgumentException(propertyName + " must be specified");
-		
-		if (resultClass.equals(String.class))
-			return (T) p;
-		else if (resultClass.equals(String[].class))
-			return (T) conf.getStrings(propertyName);
-		else if (resultClass.equals(Boolean.class))
-			return (T) Boolean.valueOf(p);
-		else if (resultClass.equals(Long.class))
-			return (T) Long.valueOf(p);
-		else if (resultClass.equals(Integer.class))
-			return (T) Integer.valueOf(p);
-		else if (resultClass.equals(Float.class))
-			return (T) Float.valueOf(p);
-		else if (resultClass.equals(Double.class))
-			return (T) Double.valueOf(p);
-		else
-			throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
-			
-	}
-
+  public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
+  public final static String USER = "wikipedia.accumulo.user";
+  public final static String PASSWORD = "wikipedia.accumulo.password";
+  public final static String TABLE_NAME = "wikipedia.accumulo.table";
+  
+  public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
+  
+  public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
+  public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
+  public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
+  
+  public final static String ANALYZER = "wikipedia.index.analyzer";
+  
+  public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
+  
+  public static String getUser(Configuration conf) {
+    return conf.get(USER);
+  };
+  
+  public static byte[] getPassword(Configuration conf) {
+    String pass = conf.get(PASSWORD);
+    if (pass == null) {
+      return null;
+    }
+    return pass.getBytes();
+  }
+  
+  public static String getTableName(Configuration conf) {
+    String tablename = conf.get(TABLE_NAME);
+    if (tablename == null) {
+      throw new RuntimeException("No data table name specified in " + TABLE_NAME);
+    }
+    return tablename;
+  }
+  
+  public static String getInstanceName(Configuration conf) {
+    return conf.get(INSTANCE_NAME);
+  }
+  
+  public static String getZookeepers(Configuration conf) {
+    String zookeepers = conf.get(ZOOKEEPERS);
+    if (zookeepers == null) {
+      throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
+    }
+    return zookeepers;
+  }
+  
+  public static Path getNamespacesFile(Configuration conf) {
+    String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
+    return new Path(filename);
+  }
+  
+  public static Path getLanguagesFile(Configuration conf) {
+    String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
+    return new Path(filename);
+  }
+  
+  public static Path getWorkingDirectory(Configuration conf) {
+    String filename = conf.get(WORKING_DIRECTORY);
+    return new Path(filename);
+  }
+  
+  public static Analyzer getAnalyzer(Configuration conf) throws IOException {
+    Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
+    return ReflectionUtils.newInstance(analyzerClass, conf);
+  }
+  
+  public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
+    return new Connector(getInstance(conf), getUser(conf), getPassword(conf));
+  }
+  
+  public static Instance getInstance(Configuration conf) {
+    return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
+  }
+  
+  public static int getNumPartitions(Configuration conf) {
+    return conf.getInt(NUM_PARTITIONS, 25);
+  }
+  
+  /**
+   * Helper method to get properties from Hadoop configuration
+   * 
+   * @param <T>
+   * @param conf
+   * @param propertyName
+   * @param resultClass
+   * @throws IllegalArgumentException
+   *           if property is not defined, null, or empty. Or if resultClass is not handled.
+   * @return value of property
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
+    String p = conf.get(propertyName);
+    if (StringUtils.isEmpty(p))
+      throw new IllegalArgumentException(propertyName + " must be specified");
+    
+    if (resultClass.equals(String.class))
+      return (T) p;
+    else if (resultClass.equals(String[].class))
+      return (T) conf.getStrings(propertyName);
+    else if (resultClass.equals(Boolean.class))
+      return (T) Boolean.valueOf(p);
+    else if (resultClass.equals(Long.class))
+      return (T) Long.valueOf(p);
+    else if (resultClass.equals(Integer.class))
+      return (T) Integer.valueOf(p);
+    else if (resultClass.equals(Float.class))
+      return (T) Float.valueOf(p);
+    else if (resultClass.equals(Double.class))
+      return (T) Double.valueOf(p);
+    else
+      throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
+    
+  }
+  
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package ingest;
 
 import java.io.IOException;
@@ -54,158 +54,156 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.iterators.aggregation.conf.AggregatorConfiguration;
 
 public class WikipediaIngester extends Configured implements Tool {
-	
-	public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
-	public final static String SPLIT_FILE = "wikipedia.split_file";
-	public final static String TABLE_NAME = "wikipedia.table";
-	
-
-	public static void main(String[] args) throws Exception {
-		int res = ToolRunner.run(new Configuration(), new WikipediaIngester(), args);
-		System.exit(res);
-	}
-		
-	private void createTables(TableOperations tops, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException {
-		//Create the shard table
-		String indexTableName = tableName + "Index";
-		String reverseIndexTableName = tableName + "ReverseIndex";
-		String metadataTableName = tableName + "Metadata";
-
-		//create the shard table
-		if (!tops.exists(tableName)) {
-            // Set a text index aggregator on the given field names. No aggregator is set if the option is not supplied 
-            String textIndexFamilies = WikipediaMapper.TOKENS_FIELD_NAME;
-            
-            if (textIndexFamilies.length() > 0) {
-                System.out.println("Adding content aggregator on the fields: " + textIndexFamilies);
-                
-                // Create and set the aggregators in one shot
-                List<AggregatorConfiguration> aggregators = new ArrayList<AggregatorConfiguration>();
-
-                for (String family : StringUtils.split(textIndexFamilies, ',')) {
-                    aggregators.add(new AggregatorConfiguration(new Text("fi\0" + family), aggregator.TextIndexAggregator.class.getName()));
-                }
-                
-                tops.create(tableName);
-		tops.addAggregators(tableName, aggregators);
-            } else {
-                tops.create(tableName);
-            }
-            
-            // Set the locality group for the full content column family
-            tops.setLocalityGroups(tableName,
-                    Collections.singletonMap("WikipediaDocuments", 
-                            Collections.singleton(new Text(WikipediaMapper.DOCUMENT_COLUMN_FAMILY))));
-			
-		}
-		
-		if (!tops.exists(indexTableName)) {
-            tops.create(indexTableName);
-            //Add the UID aggregator
-            for (IteratorScope scope : IteratorScope.values()) {
-                String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator");
-                tops.setProperty(indexTableName, stem, "19,iterator.TotalAggregatingIterator");
-                stem += ".opt.";
-                tops.setProperty(indexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator");
-              
-            }    
-		}
-
-		if (!tops.exists(reverseIndexTableName)) {
-            tops.create(reverseIndexTableName);
-            //Add the UID aggregator
-            for (IteratorScope scope : IteratorScope.values()) {
-                String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator");
-                tops.setProperty(reverseIndexTableName, stem, "19,iterator.TotalAggregatingIterator");
-                stem += ".opt.";
-                tops.setProperty(reverseIndexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator");
-              
-            }    
-		}
-
-		if (!tops.exists(metadataTableName)) {
-        	//Add the NumSummation aggregator for the frequency column
-            List<AggregatorConfiguration> aggregators = new ArrayList<AggregatorConfiguration>();
-            aggregators.add(new AggregatorConfiguration(new Text("f"), NumSummation.class.getName()));
-        	tops.create(metadataTableName);
-        	tops.addAggregators(metadataTableName, aggregators);
-		}
-	}
-
-	@Override
-	public int run(String[] args) throws Exception {
-		Job job = new Job(getConf(), "Ingest Wikipedia");
-		Configuration conf = job.getConfiguration();
-
-
-		String tablename = WikipediaConfiguration.getTableName(conf);
-
-		String zookeepers = WikipediaConfiguration.getZookeepers(conf);
-		String instanceName = WikipediaConfiguration.getInstanceName(conf);
-		
-		String user = WikipediaConfiguration.getUser(conf);
-		byte[] password = WikipediaConfiguration.getPassword(conf);
-		Connector connector = WikipediaConfiguration.getConnector(conf);
-		
-		TableOperations tops = connector.tableOperations();
-		
-		createTables(tops, tablename);
-		
-		configureJob(job);
-
-		List<Path> inputPaths = new ArrayList<Path>();
-		SortedSet<String> languages = new TreeSet<String>();
-		FileSystem fs = FileSystem.get(conf);
-		Path parent = new Path(conf.get("wikipedia.input"));
-		listFiles(parent, fs, inputPaths, languages);
-		
-		System.out.println("Input files in " + parent + ":" +  inputPaths.size());
-		Path[] inputPathsArray = new Path[inputPaths.size()];
-		inputPaths.toArray(inputPathsArray);
-		
-		System.out.println("Languages:" +  languages.size());
-						
-		FileInputFormat.setInputPaths(job, inputPathsArray);
-		
-		job.setMapperClass(WikipediaMapper.class);		
-		job.setNumReduceTasks(0);
-		job.setMapOutputKeyClass(Text.class);
-		job.setMapOutputValueClass(Mutation.class);
-		job.setOutputFormatClass(AccumuloOutputFormat.class);
-		AccumuloOutputFormat.setOutputInfo(job, user, password, true, tablename);
-		AccumuloOutputFormat.setZooKeeperInstance(job, instanceName, zookeepers);
-		
-		return job.waitForCompletion(true) ? 0 : 1;
-	}
-
-	public final static PathFilter partFilter = new PathFilter() {
-		@Override
-		public boolean accept(Path path) {
-			return path.getName().startsWith("part");
-		};
-	};
-
-	protected void configureJob(Job job) {
-		Configuration conf = job.getConfiguration();
-		job.setJarByClass(WikipediaIngester.class);
-		job.setInputFormatClass(WikipediaInputFormat.class);
-		conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
-		conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
-	}
-	
-	protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
-	protected void listFiles(Path path, FileSystem fs, List<Path> files, Set<String> languages) throws IOException {
-		for (FileStatus status : fs.listStatus(path)) {
-			if (status.isDir()) {
-				listFiles(status.getPath(), fs, files, languages);
-			} else {
-				Path p = status.getPath();
-				Matcher matcher = filePattern.matcher(p.getName());
-				if (matcher.matches()) {
-					languages.add(matcher.group(1));
-					files.add(p);
-				}
-			}
-		}
-	}
+  
+  public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
+  public final static String SPLIT_FILE = "wikipedia.split_file";
+  public final static String TABLE_NAME = "wikipedia.table";
+  
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new WikipediaIngester(), args);
+    System.exit(res);
+  }
+  
+  private void createTables(TableOperations tops, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
+      TableExistsException {
+    // Create the shard table
+    String indexTableName = tableName + "Index";
+    String reverseIndexTableName = tableName + "ReverseIndex";
+    String metadataTableName = tableName + "Metadata";
+    
+    // create the shard table
+    if (!tops.exists(tableName)) {
+      // Set a text index aggregator on the given field names. No aggregator is set if the option is not supplied
+      String textIndexFamilies = WikipediaMapper.TOKENS_FIELD_NAME;
+      
+      if (textIndexFamilies.length() > 0) {
+        System.out.println("Adding content aggregator on the fields: " + textIndexFamilies);
+        
+        // Create and set the aggregators in one shot
+        List<AggregatorConfiguration> aggregators = new ArrayList<AggregatorConfiguration>();
+        
+        for (String family : StringUtils.split(textIndexFamilies, ',')) {
+          aggregators.add(new AggregatorConfiguration(new Text("fi\0" + family), aggregator.TextIndexAggregator.class.getName()));
+        }
+        
+        tops.create(tableName);
+        tops.addAggregators(tableName, aggregators);
+      } else {
+        tops.create(tableName);
+      }
+      
+      // Set the locality group for the full content column family
+      tops.setLocalityGroups(tableName, Collections.singletonMap("WikipediaDocuments", Collections.singleton(new Text(WikipediaMapper.DOCUMENT_COLUMN_FAMILY))));
+      
+    }
+    
+    if (!tops.exists(indexTableName)) {
+      tops.create(indexTableName);
+      // Add the UID aggregator
+      for (IteratorScope scope : IteratorScope.values()) {
+        String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator");
+        tops.setProperty(indexTableName, stem, "19,iterator.TotalAggregatingIterator");
+        stem += ".opt.";
+        tops.setProperty(indexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator");
+        
+      }
+    }
+    
+    if (!tops.exists(reverseIndexTableName)) {
+      tops.create(reverseIndexTableName);
+      // Add the UID aggregator
+      for (IteratorScope scope : IteratorScope.values()) {
+        String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator");
+        tops.setProperty(reverseIndexTableName, stem, "19,iterator.TotalAggregatingIterator");
+        stem += ".opt.";
+        tops.setProperty(reverseIndexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator");
+        
+      }
+    }
+    
+    if (!tops.exists(metadataTableName)) {
+      // Add the NumSummation aggregator for the frequency column
+      List<AggregatorConfiguration> aggregators = new ArrayList<AggregatorConfiguration>();
+      aggregators.add(new AggregatorConfiguration(new Text("f"), NumSummation.class.getName()));
+      tops.create(metadataTableName);
+      tops.addAggregators(metadataTableName, aggregators);
+    }
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    Job job = new Job(getConf(), "Ingest Wikipedia");
+    Configuration conf = job.getConfiguration();
+    
+    String tablename = WikipediaConfiguration.getTableName(conf);
+    
+    String zookeepers = WikipediaConfiguration.getZookeepers(conf);
+    String instanceName = WikipediaConfiguration.getInstanceName(conf);
+    
+    String user = WikipediaConfiguration.getUser(conf);
+    byte[] password = WikipediaConfiguration.getPassword(conf);
+    Connector connector = WikipediaConfiguration.getConnector(conf);
+    
+    TableOperations tops = connector.tableOperations();
+    
+    createTables(tops, tablename);
+    
+    configureJob(job);
+    
+    List<Path> inputPaths = new ArrayList<Path>();
+    SortedSet<String> languages = new TreeSet<String>();
+    FileSystem fs = FileSystem.get(conf);
+    Path parent = new Path(conf.get("wikipedia.input"));
+    listFiles(parent, fs, inputPaths, languages);
+    
+    System.out.println("Input files in " + parent + ":" + inputPaths.size());
+    Path[] inputPathsArray = new Path[inputPaths.size()];
+    inputPaths.toArray(inputPathsArray);
+    
+    System.out.println("Languages:" + languages.size());
+    
+    FileInputFormat.setInputPaths(job, inputPathsArray);
+    
+    job.setMapperClass(WikipediaMapper.class);
+    job.setNumReduceTasks(0);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Mutation.class);
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+    AccumuloOutputFormat.setOutputInfo(job, user, password, true, tablename);
+    AccumuloOutputFormat.setZooKeeperInstance(job, instanceName, zookeepers);
+    
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+  
+  public final static PathFilter partFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith("part");
+    };
+  };
+  
+  protected void configureJob(Job job) {
+    Configuration conf = job.getConfiguration();
+    job.setJarByClass(WikipediaIngester.class);
+    job.setInputFormatClass(WikipediaInputFormat.class);
+    conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
+    conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
+  }
+  
+  protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
+  
+  protected void listFiles(Path path, FileSystem fs, List<Path> files, Set<String> languages) throws IOException {
+    for (FileStatus status : fs.listStatus(path)) {
+      if (status.isDir()) {
+        listFiles(status.getPath(), fs, files, languages);
+      } else {
+        Path p = status.getPath();
+        Matcher matcher = filePattern.matcher(p.getName());
+        if (matcher.matches()) {
+          languages.add(matcher.group(1));
+          files.add(p);
+        }
+      }
+    }
+  }
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package ingest;
 
 import org.apache.hadoop.fs.Path;
@@ -28,15 +28,15 @@ import org.apache.hadoop.mapreduce.lib.i
 import reader.AggregatingRecordReader;
 
 public class WikipediaInputFormat extends TextInputFormat {
-
-	@Override
-	public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
-		return new AggregatingRecordReader();
-	}
-
-	@Override
-	protected boolean isSplitable(JobContext context, Path file) {
-		return false;
-	}
-
+  
+  @Override
+  public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
+    return new AggregatingRecordReader();
+  }
+  
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    return false;
+  }
+  
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 /**
  * 
  */
@@ -65,200 +65,195 @@ import org.apache.accumulo.core.security
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
-public class WikipediaMapper extends Mapper<LongWritable, Text, Text, Mutation> {
-	
-	private static final Logger log = Logger.getLogger(WikipediaMapper.class);
-	
-	public final static Charset UTF8 = Charset.forName("UTF-8");
-	public static final String DOCUMENT_COLUMN_FAMILY = "d";
-	public static final String METADATA_EVENT_COLUMN_FAMILY = "e";
-	public static final String METADATA_INDEX_COLUMN_FAMILY = "i";
-	public static final String TOKENS_FIELD_NAME = "TEXT";
-	
-	private final static Pattern languagePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
-	private static final Value NULL_VALUE = new Value(new byte[0]);
-	private static final String cvPrefix = "all|";
-
-	
-	private ArticleExtractor extractor;
-	private String language;
-	private int numPartitions = 0;
-	private Set<?> stopwords = null;
-	private ColumnVisibility cv = null;
-	
-
-	private Text tablename = null;
-	private Text indexTableName = null;
-	private Text reverseIndexTableName = null;
-	private Text metadataTableName = null;
-
-	
-	@Override
-	public void setup(Context context) {
-		Configuration conf = context.getConfiguration();
-		tablename = new Text(WikipediaConfiguration.getTableName(conf));
-		indexTableName = new Text(tablename + "Index");
-		reverseIndexTableName = new Text(tablename + "ReverseIndex");
-		metadataTableName = new Text(tablename + "Metadata");
-
-		FileSplit split = (FileSplit) context.getInputSplit();
-		String fileName = split.getPath().getName();
-		Matcher matcher = languagePattern.matcher(fileName);
-		if (matcher.matches()) {
-			language = matcher.group(1).replace('_', '-').toLowerCase();
-			if (language.equals("arwiki"))
-				stopwords = ArabicAnalyzer.getDefaultStopSet();
-			else if (language.equals("brwiki"))
-				stopwords = BrazilianAnalyzer.getDefaultStopSet();
-			else if (language.startsWith("zh"))
-				stopwords = CJKAnalyzer.getDefaultStopSet();
-			else if (language.equals("dewiki"))
-				stopwords = GermanAnalyzer.getDefaultStopSet();
-			else if (language.equals("elwiki"))
-				stopwords = GreekAnalyzer.getDefaultStopSet();
-			else if (language.equals("fawiki"))
-				stopwords = PersianAnalyzer.getDefaultStopSet();
-			else if (language.equals("frwiki"))
-				stopwords = FrenchAnalyzer.getDefaultStopSet();
-			else if (language.equals("nlwiki"))
-				stopwords = DutchAnalyzer.getDefaultStopSet();
-			else
-				stopwords = StopAnalyzer.ENGLISH_STOP_WORDS_SET;
-			
-		} else {
-			throw new RuntimeException("Unknown ingest language! " + fileName);
-		}
-		extractor = new ArticleExtractor();
-		numPartitions = WikipediaConfiguration.getNumPartitions(conf);
-		cv = new ColumnVisibility(cvPrefix + language);
-		
-	}
-
-	/**
-	 * We will partition the documents based on the document id
-	 * @param article
-	 * @param numPartitions
-	 * @return
-	 * @throws IllegalFormatException
-	 */
-	public static int getPartitionId(Article article, int numPartitions) throws IllegalFormatException {
-		return article.getId()  % numPartitions;
-	}
-
-	@Override
-	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-		Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8));
-		String NULL_BYTE = "\u0000";
-		String colfPrefix = language+NULL_BYTE;
-		String indexPrefix = "fi"+NULL_BYTE;
-		if (article != null) {
-			Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions)));
-						
-			//Create the mutations for the document.
-			//Row is partition id, colf is language0articleid, colq is fieldName\0fieldValue
-			Mutation m = new Mutation(partitionId);
-			for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
-				m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp() , NULL_VALUE);
-				//Create mutations for the metadata table.
-				Mutation mm = new Mutation(entry.getKey());
-				mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
-				context.write(metadataTableName, mm);
-			}
-
-			//Tokenize the content
-			Set<String> tokens = getTokens(article);
-			
-			//We are going to put the fields to be indexed into a multimap. This allows us to iterate
-			//over the entire set once.
-			Multimap<String,String> indexFields = HashMultimap.create();
-			//Add the normalized field values
-			LcNoDiacriticsNormalizer normalizer = new LcNoDiacriticsNormalizer();
-			for (Entry<String,String> index : article.getNormalizedFieldValues().entrySet())
-				indexFields.put(index.getKey(), index.getValue());
-			//Add the tokens
-			for (String token : tokens)
-				indexFields.put(TOKENS_FIELD_NAME, normalizer.normalizeFieldValue("", token));
-
-
-			for (Entry<String,String> index : indexFields.entries()) {
-				//Create mutations for the in partition index
-				//Row is partition id, colf is 'fi'\0fieldName, colq is fieldValue\0language\0article id
-				m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(), cv, article.getTimestamp() , NULL_VALUE);
-
-				
-				//Create mutations for the global index
-				//Create a UID object for the Value
-				Builder uidBuilder = Uid.List.newBuilder();
-				uidBuilder.setIGNORE(false);
-				uidBuilder.setCOUNT(1);
-				uidBuilder.addUID(Integer.toString(article.getId()));
-				Uid.List uidList = uidBuilder.build();
-				Value val = new Value(uidList.toByteArray());
-
-				//Create mutations for the global index
-				//Row is field value, colf is field name, colq is partitionid\0language, value is Uid.List object
-				Mutation gm = new Mutation(index.getValue());
-				gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp() , val);
-				context.write(indexTableName, gm);
-				
-				//Create mutations for the global reverse index
-				Mutation grm = new Mutation(StringUtils.reverse(index.getValue()));
-				grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp() , val);				
-				context.write(reverseIndexTableName, grm);
-				
-				//Create mutations for the metadata table.
-				Mutation mm = new Mutation(index.getKey());
-				mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE);
-				context.write(metadataTableName, mm);
-
-
-			}
-			//Add the entire text to the document section of the table.
-			//row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document
-			m.put(DOCUMENT_COLUMN_FAMILY, colfPrefix + article.getId(), cv, article.getTimestamp(),
-					new Value(Base64.encodeBytes(article.getText().getBytes(), Base64.GZIP).getBytes()));
-			context.write(tablename, m);
-			
-		} else {
-			context.getCounter("wikipedia", "invalid articles").increment(1);
-		}
-		context.progress();
-	}
-	
-	/**
-	 * Tokenize the wikipedia content
-	 * 
-	 * @param article
-	 * @return
-	 * @throws IOException
-	 */
-	private Set<String> getTokens(Article article) throws IOException {
-		Set<String> tokenList = new HashSet<String>();
-		WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText()));
-		TermAttribute term = tok.addAttribute(TermAttribute.class);
-		StopFilter filter = new StopFilter(false, tok, stopwords, true);
-		try {
-			while (filter.incrementToken()) {
-				String token = term.term();
-				if (!StringUtils.isEmpty(token))
-					tokenList.add(token);
-			}
-		} catch (IOException e) {
-			log.error("Error tokenizing text", e);
-		} finally {
-			try {
-				tok.end();
-			} catch (IOException e) {
-				log.error("Error calling end()", e);
-			} finally  {
-				try {
-					tok.close();
-				} catch (IOException e) {
-					log.error("Error closing tokenizer", e);
-				}
-			}
-		}
-		return tokenList;
-	}
-
+public class WikipediaMapper extends Mapper<LongWritable,Text,Text,Mutation> {
+  
+  private static final Logger log = Logger.getLogger(WikipediaMapper.class);
+  
+  public final static Charset UTF8 = Charset.forName("UTF-8");
+  public static final String DOCUMENT_COLUMN_FAMILY = "d";
+  public static final String METADATA_EVENT_COLUMN_FAMILY = "e";
+  public static final String METADATA_INDEX_COLUMN_FAMILY = "i";
+  public static final String TOKENS_FIELD_NAME = "TEXT";
+  
+  private final static Pattern languagePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
+  private static final Value NULL_VALUE = new Value(new byte[0]);
+  private static final String cvPrefix = "all|";
+  
+  private ArticleExtractor extractor;
+  private String language;
+  private int numPartitions = 0;
+  private Set<?> stopwords = null;
+  private ColumnVisibility cv = null;
+  
+  private Text tablename = null;
+  private Text indexTableName = null;
+  private Text reverseIndexTableName = null;
+  private Text metadataTableName = null;
+  
+  @Override
+  public void setup(Context context) {
+    Configuration conf = context.getConfiguration();
+    tablename = new Text(WikipediaConfiguration.getTableName(conf));
+    indexTableName = new Text(tablename + "Index");
+    reverseIndexTableName = new Text(tablename + "ReverseIndex");
+    metadataTableName = new Text(tablename + "Metadata");
+    
+    FileSplit split = (FileSplit) context.getInputSplit();
+    String fileName = split.getPath().getName();
+    Matcher matcher = languagePattern.matcher(fileName);
+    if (matcher.matches()) {
+      language = matcher.group(1).replace('_', '-').toLowerCase();
+      if (language.equals("arwiki"))
+        stopwords = ArabicAnalyzer.getDefaultStopSet();
+      else if (language.equals("brwiki"))
+        stopwords = BrazilianAnalyzer.getDefaultStopSet();
+      else if (language.startsWith("zh"))
+        stopwords = CJKAnalyzer.getDefaultStopSet();
+      else if (language.equals("dewiki"))
+        stopwords = GermanAnalyzer.getDefaultStopSet();
+      else if (language.equals("elwiki"))
+        stopwords = GreekAnalyzer.getDefaultStopSet();
+      else if (language.equals("fawiki"))
+        stopwords = PersianAnalyzer.getDefaultStopSet();
+      else if (language.equals("frwiki"))
+        stopwords = FrenchAnalyzer.getDefaultStopSet();
+      else if (language.equals("nlwiki"))
+        stopwords = DutchAnalyzer.getDefaultStopSet();
+      else
+        stopwords = StopAnalyzer.ENGLISH_STOP_WORDS_SET;
+      
+    } else {
+      throw new RuntimeException("Unknown ingest language! " + fileName);
+    }
+    extractor = new ArticleExtractor();
+    numPartitions = WikipediaConfiguration.getNumPartitions(conf);
+    cv = new ColumnVisibility(cvPrefix + language);
+    
+  }
+  
+  /**
+   * We will partition the documents based on the document id
+   * 
+   * @param article
+   * @param numPartitions
+   * @return
+   * @throws IllegalFormatException
+   */
+  public static int getPartitionId(Article article, int numPartitions) throws IllegalFormatException {
+    return article.getId() % numPartitions;
+  }
+  
+  @Override
+  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+    Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8));
+    String NULL_BYTE = "\u0000";
+    String colfPrefix = language + NULL_BYTE;
+    String indexPrefix = "fi" + NULL_BYTE;
+    if (article != null) {
+      Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions)));
+      
+      // Create the mutations for the document.
+      // Row is partition id, colf is language0articleid, colq is fieldName\0fieldValue
+      Mutation m = new Mutation(partitionId);
+      for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
+        m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE);
+        // Create mutations for the metadata table.
+        Mutation mm = new Mutation(entry.getKey());
+        mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
+        context.write(metadataTableName, mm);
+      }
+      
+      // Tokenize the content
+      Set<String> tokens = getTokens(article);
+      
+      // We are going to put the fields to be indexed into a multimap. This allows us to iterate
+      // over the entire set once.
+      Multimap<String,String> indexFields = HashMultimap.create();
+      // Add the normalized field values
+      LcNoDiacriticsNormalizer normalizer = new LcNoDiacriticsNormalizer();
+      for (Entry<String,String> index : article.getNormalizedFieldValues().entrySet())
+        indexFields.put(index.getKey(), index.getValue());
+      // Add the tokens
+      for (String token : tokens)
+        indexFields.put(TOKENS_FIELD_NAME, normalizer.normalizeFieldValue("", token));
+      
+      for (Entry<String,String> index : indexFields.entries()) {
+        // Create mutations for the in partition index
+        // Row is partition id, colf is 'fi'\0fieldName, colq is fieldValue\0language\0article id
+        m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(), cv, article.getTimestamp(), NULL_VALUE);
+        
+        // Create mutations for the global index
+        // Create a UID object for the Value
+        Builder uidBuilder = Uid.List.newBuilder();
+        uidBuilder.setIGNORE(false);
+        uidBuilder.setCOUNT(1);
+        uidBuilder.addUID(Integer.toString(article.getId()));
+        Uid.List uidList = uidBuilder.build();
+        Value val = new Value(uidList.toByteArray());
+        
+        // Create mutations for the global index
+        // Row is field value, colf is field name, colq is partitionid\0language, value is Uid.List object
+        Mutation gm = new Mutation(index.getValue());
+        gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val);
+        context.write(indexTableName, gm);
+        
+        // Create mutations for the global reverse index
+        Mutation grm = new Mutation(StringUtils.reverse(index.getValue()));
+        grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val);
+        context.write(reverseIndexTableName, grm);
+        
+        // Create mutations for the metadata table.
+        Mutation mm = new Mutation(index.getKey());
+        mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE);
+        context.write(metadataTableName, mm);
+        
+      }
+      // Add the entire text to the document section of the table.
+      // row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document
+      m.put(DOCUMENT_COLUMN_FAMILY, colfPrefix + article.getId(), cv, article.getTimestamp(),
+          new Value(Base64.encodeBytes(article.getText().getBytes(), Base64.GZIP).getBytes()));
+      context.write(tablename, m);
+      
+    } else {
+      context.getCounter("wikipedia", "invalid articles").increment(1);
+    }
+    context.progress();
+  }
+  
+  /**
+   * Tokenize the wikipedia content
+   * 
+   * @param article
+   * @return
+   * @throws IOException
+   */
+  private Set<String> getTokens(Article article) throws IOException {
+    Set<String> tokenList = new HashSet<String>();
+    WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText()));
+    TermAttribute term = tok.addAttribute(TermAttribute.class);
+    StopFilter filter = new StopFilter(false, tok, stopwords, true);
+    try {
+      while (filter.incrementToken()) {
+        String token = term.term();
+        if (!StringUtils.isEmpty(token))
+          tokenList.add(token);
+      }
+    } catch (IOException e) {
+      log.error("Error tokenizing text", e);
+    } finally {
+      try {
+        tok.end();
+      } catch (IOException e) {
+        log.error("Error calling end()", e);
+      } finally {
+        try {
+          tok.close();
+        } catch (IOException e) {
+          log.error("Error closing tokenizer", e);
+        }
+      }
+    }
+    return tokenList;
+  }
+  
 }