You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2011/12/05 21:05:51 UTC
svn commit: r1210600 [1/16] - in
/incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/
ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/
ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/
ingest/src/main/java/protobuf/ ...
Author: billie
Date: Mon Dec 5 20:05:49 2011
New Revision: 1210600
URL: http://svn.apache.org/viewvc?rev=1210600&view=rev
Log:
ACCUMULO-41 formatted java and pom files
Modified:
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/iterator/TotalAggregatingIterator.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/LcNoDiacriticsNormalizer.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/NoOpNormalizer.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/Normalizer.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/NumberNormalizer.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/protobuf/TermWeight.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/protobuf/Uid.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/AggregatingRecordReader.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/LfLineReader.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/LongLineRecordReader.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/util/TextUtil.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/TextIndexAggregatorTest.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/StandaloneStatusReporter.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/WikipediaMapperTest.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/normalizer/testNumberNormalizer.java
incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/reader/AggregatingRecordReaderTest.java
incubator/accumulo/trunk/contrib/accumulo_sample/pom.xml
incubator/accumulo/trunk/contrib/accumulo_sample/query-war/pom.xml
incubator/accumulo/trunk/contrib/accumulo_sample/query/pom.xml
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/function/QueryFunctions.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/BooleanLogicIterator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/BooleanLogicTreeNode.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/DefaultIteratorEnvironment.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/EvaluatingIterator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/FieldIndexIterator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/OptimizedQueryIterator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/UniqFieldNameValueIterator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/jexl/Arithmetic.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/ContentLogic.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/FieldIndexQueryReWriter.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/JexlOperatorConstants.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/QueryEvaluator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/QueryParser.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/RangeCalculator.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/TreeBuilder.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/TreeNode.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/Document.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/Field.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/Results.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/query/IQuery.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/query/Query.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java
incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml Mon Dec 5 20:05:49 2011
@@ -1,20 +1,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
<modelVersion>4.0.0</modelVersion>
<parent>
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package aggregator;
import java.util.HashSet;
@@ -27,62 +27,61 @@ import org.apache.accumulo.core.iterator
import com.google.protobuf.InvalidProtocolBufferException;
/**
- * Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization
- * for the global index and global reverse index, where the list of UIDs for events will be maintained in the
- * index for low cardinality terms (Low in this case being less than 20).
- *
+ * Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization for the global index and global reverse index, where
+ * the list of UIDs for events will be maintained in the index for low cardinality terms (Low in this case being less than 20).
+ *
*/
public class GlobalIndexUidAggregator implements Aggregator {
-
- private static final Logger log = Logger.getLogger(GlobalIndexUidAggregator.class);
- private Uid.List.Builder builder = Uid.List.newBuilder();
- //Using a set instead of a list so that duplicate IDs are filtered out of the list.
- private HashSet<String> uids = new HashSet<String>();
- private boolean seenIgnore = false;
- public static final int MAX = 20;
- private long count = 0;
-
- @Override
- public Value aggregate() {
- //Special case logic
- //If we have aggregated more than MAX UIDs, then null out the UID list and set IGNORE to true
- //However, always maintain the count
- if (uids.size() > MAX || seenIgnore) {
- builder.setCOUNT(count);
- builder.setIGNORE(true);
- builder.clearUID();
- } else {
- builder.setCOUNT(count);
- builder.setIGNORE(false);
- builder.addAllUID(uids);
- }
- return new Value(builder.build().toByteArray());
- }
-
- @Override
- public void collect(Value value) {
- if (null == value || value.get().length == 0)
- return;
- //Collect the values, which are serialized Uid.List objects
- try {
- Uid.List v = Uid.List.parseFrom(value.get());
- count = count + v.getCOUNT();
- if (v.getIGNORE()) {
- seenIgnore = true;
- }
- //Add the incoming list to this list
- uids.addAll(v.getUIDList());
- } catch (InvalidProtocolBufferException e) {
- log.error("Value passed to aggregator was not of type Uid.List", e);
- }
- }
-
- @Override
- public void reset() {
- count = 0;
- seenIgnore = false;
- builder = Uid.List.newBuilder();
- uids.clear();
- }
-
+
+ private static final Logger log = Logger.getLogger(GlobalIndexUidAggregator.class);
+ private Uid.List.Builder builder = Uid.List.newBuilder();
+ // Using a set instead of a list so that duplicate IDs are filtered out of the list.
+ private HashSet<String> uids = new HashSet<String>();
+ private boolean seenIgnore = false;
+ public static final int MAX = 20;
+ private long count = 0;
+
+ @Override
+ public Value aggregate() {
+ // Special case logic
+ // If we have aggregated more than MAX UIDs, then null out the UID list and set IGNORE to true
+ // However, always maintain the count
+ if (uids.size() > MAX || seenIgnore) {
+ builder.setCOUNT(count);
+ builder.setIGNORE(true);
+ builder.clearUID();
+ } else {
+ builder.setCOUNT(count);
+ builder.setIGNORE(false);
+ builder.addAllUID(uids);
+ }
+ return new Value(builder.build().toByteArray());
+ }
+
+ @Override
+ public void collect(Value value) {
+ if (null == value || value.get().length == 0)
+ return;
+ // Collect the values, which are serialized Uid.List objects
+ try {
+ Uid.List v = Uid.List.parseFrom(value.get());
+ count = count + v.getCOUNT();
+ if (v.getIGNORE()) {
+ seenIgnore = true;
+ }
+ // Add the incoming list to this list
+ uids.addAll(v.getUIDList());
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Value passed to aggregator was not of type Uid.List", e);
+ }
+ }
+
+ @Override
+ public void reset() {
+ count = 0;
+ seenIgnore = false;
+ builder = Uid.List.newBuilder();
+ uids.clear();
+ }
+
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package aggregator;
import java.util.ArrayList;
@@ -30,65 +30,65 @@ import com.google.protobuf.InvalidProtoc
/**
* An Aggregator to merge together a list of term offsets and one normalized term frequency
- *
+ *
*/
public class TextIndexAggregator implements Aggregator {
- private static final Logger log = Logger.getLogger(TextIndexAggregator.class);
+ private static final Logger log = Logger.getLogger(TextIndexAggregator.class);
+
+ private List<Integer> offsets = new ArrayList<Integer>();
+ private TermWeight.Info.Builder builder = TermWeight.Info.newBuilder();
+ private float normalizedTermFrequency = 0f;
+
+ @Override
+ public Value aggregate() {
+ // Keep the sorted order we tried to maintain
+ for (int i = 0; i < offsets.size(); ++i) {
+ builder.addWordOffset(offsets.get(i));
+ }
- private List<Integer> offsets = new ArrayList<Integer>();
- private TermWeight.Info.Builder builder = TermWeight.Info.newBuilder();
- private float normalizedTermFrequency = 0f;
-
- @Override
- public Value aggregate() {
- // Keep the sorted order we tried to maintain
- for (int i = 0; i < offsets.size(); ++i) {
- builder.addWordOffset(offsets.get(i));
- }
-
- builder.setNormalizedTermFrequency(normalizedTermFrequency);
-
- return new Value(builder.build().toByteArray());
+ builder.setNormalizedTermFrequency(normalizedTermFrequency);
+
+ return new Value(builder.build().toByteArray());
+ }
+
+ @Override
+ public void collect(Value value) {
+ // Make sure we don't aggregate something else
+ if (value == null || value.get().length == 0) {
+ return;
}
-
- @Override
- public void collect(Value value) {
- // Make sure we don't aggregate something else
- if (value == null || value.get().length == 0) {
- return;
- }
-
- TermWeight.Info info;
-
- try {
- info = TermWeight.Info.parseFrom(value.get());
- } catch (InvalidProtocolBufferException e) {
- log.error("Value passed to aggregator was not of type TermWeight.Info", e);
- return;
- }
-
- // Add each offset into the list maintaining sorted order
- for (int offset : info.getWordOffsetList()) {
- int pos = Collections.binarySearch(offsets, offset);
-
- if (pos < 0) {
- // Undo the transform on the insertion point
- offsets.add((-1 * pos) - 1, offset);
- } else {
- offsets.add(pos, offset);
- }
- }
-
- if (info.getNormalizedTermFrequency() > 0) {
- this.normalizedTermFrequency += info.getNormalizedTermFrequency();
- }
+
+ TermWeight.Info info;
+
+ try {
+ info = TermWeight.Info.parseFrom(value.get());
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Value passed to aggregator was not of type TermWeight.Info", e);
+ return;
}
-
- @Override
- public void reset() {
- this.offsets.clear();
- this.normalizedTermFrequency = 0f;
- this.builder = TermWeight.Info.newBuilder();
+
+ // Add each offset into the list maintaining sorted order
+ for (int offset : info.getWordOffsetList()) {
+ int pos = Collections.binarySearch(offsets, offset);
+
+ if (pos < 0) {
+ // Undo the transform on the insertion point
+ offsets.add((-1 * pos) - 1, offset);
+ } else {
+ offsets.add(pos, offset);
+ }
}
-
+
+ if (info.getNormalizedTermFrequency() > 0) {
+ this.normalizedTermFrequency += info.getNormalizedTermFrequency();
+ }
+ }
+
+ @Override
+ public void reset() {
+ this.offsets.clear();
+ this.normalizedTermFrequency = 0f;
+ this.builder = TermWeight.Info.newBuilder();
+ }
+
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package ingest;
import java.io.Reader;
@@ -31,138 +31,142 @@ import normalizer.LcNoDiacriticsNormaliz
import normalizer.NumberNormalizer;
public class ArticleExtractor {
-
- public final static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'Z");
- private static NumberNormalizer nn = new NumberNormalizer();
- private static LcNoDiacriticsNormalizer lcdn = new LcNoDiacriticsNormalizer();
-
- public static class Article {
- int id;
- String title;
- long timestamp;
- String comments;
- String text;
-
- private Article(int id, String title, long timestamp, String comments, String text) {
- super();
- this.id = id;
- this.title = title;
- this.timestamp = timestamp;
- this.comments = comments;
- this.text = text;
- }
- public int getId() {
- return id;
- }
- public String getTitle() {
- return title;
- }
- public String getComments() {
- return comments;
- }
- public String getText() {
- return text;
- }
- public long getTimestamp() {
- return timestamp;
- }
-
- public Map<String,Object> getFieldValues() {
- Map<String,Object> fields = new HashMap<String,Object>();
- fields.put("ID", this.id);
- fields.put("TITLE", this.title);
- fields.put("TIMESTAMP", this.timestamp);
- fields.put("COMMENTS", this.comments);
- return fields;
- }
-
- public Map<String,String> getNormalizedFieldValues() {
- Map<String,String> fields = new HashMap<String,String>();
- fields.put("ID", nn.normalizeFieldValue("ID", this.id));
- fields.put("TITLE", lcdn.normalizeFieldValue("TITLE", this.title));
- fields.put("TIMESTAMP", nn.normalizeFieldValue("TIMESTAMP", this.timestamp));
- fields.put("COMMENTS", lcdn.normalizeFieldValue("COMMENTS", this.comments));
- return fields;
- }
-
- }
-
- public ArticleExtractor() {
- }
-
- public Article extract(Reader reader) {
- XMLInputFactory xmlif = XMLInputFactory.newInstance();
- xmlif.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.TRUE);
-
- XMLStreamReader xmlr = null;
-
- try {
- xmlr = xmlif.createXMLStreamReader(reader);
- } catch (XMLStreamException e1) {
- throw new RuntimeException(e1);
- }
-
- QName titleName = QName.valueOf("title");
- QName textName = QName.valueOf("text");
- QName revisionName = QName.valueOf("revision");
- QName timestampName = QName.valueOf("timestamp");
- QName commentName = QName.valueOf("comment");
- QName idName = QName.valueOf("id");
-
- Map<QName, StringBuilder> tags = new HashMap<QName, StringBuilder>();
- for (QName tag : new QName[] { titleName, textName, timestampName, commentName, idName }) {
- tags.put(tag, new StringBuilder());
- }
-
- StringBuilder articleText = tags.get(textName);
- StringBuilder titleText = tags.get(titleName);
- StringBuilder timestampText = tags.get(timestampName);
- StringBuilder commentText = tags.get(commentName);
- StringBuilder idText = tags.get(idName);
-
- StringBuilder current = null;
- boolean inRevision = false;
- while (true) {
- try {
- if (!xmlr.hasNext())
- break;
- xmlr.next();
- } catch (XMLStreamException e) {
- throw new RuntimeException(e);
- }
- QName currentName = null;
- if (xmlr.hasName()) {
- currentName = xmlr.getName();
- }
- if (xmlr.isStartElement() && tags.containsKey(currentName)) {
- if (!inRevision || (!currentName.equals(revisionName) && !currentName.equals(idName))) {
- current = tags.get(currentName);
- current.setLength(0);
- }
- } else if (xmlr.isStartElement() && currentName.equals(revisionName)) {
- inRevision = true;
- } else if (xmlr.isEndElement() && currentName.equals(revisionName)) {
- inRevision = false;
- } else if (xmlr.isEndElement() && current != null) {
- if (textName.equals(currentName)) {
-
- String title = titleText.toString();
- String text = articleText.toString();
- String comment = commentText.toString();
- int id = Integer.parseInt(idText.toString());
- long timestamp;
- try {
- timestamp = dateFormat.parse(timestampText.append("+0000").toString()).getTime();
- return new Article(id, title, timestamp, comment, text);
- } catch (ParseException e) {
- return null;
- }
- }
- current = null;
- } else if (current != null && xmlr.hasText()) {
- current.append(xmlr.getText());
- }
- }
- return null;
- }
+
+ public final static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'Z");
+ private static NumberNormalizer nn = new NumberNormalizer();
+ private static LcNoDiacriticsNormalizer lcdn = new LcNoDiacriticsNormalizer();
+
+ public static class Article {
+ int id;
+ String title;
+ long timestamp;
+ String comments;
+ String text;
+
+ private Article(int id, String title, long timestamp, String comments, String text) {
+ super();
+ this.id = id;
+ this.title = title;
+ this.timestamp = timestamp;
+ this.comments = comments;
+ this.text = text;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public String getComments() {
+ return comments;
+ }
+
+ public String getText() {
+ return text;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public Map<String,Object> getFieldValues() {
+ Map<String,Object> fields = new HashMap<String,Object>();
+ fields.put("ID", this.id);
+ fields.put("TITLE", this.title);
+ fields.put("TIMESTAMP", this.timestamp);
+ fields.put("COMMENTS", this.comments);
+ return fields;
+ }
+
+ public Map<String,String> getNormalizedFieldValues() {
+ Map<String,String> fields = new HashMap<String,String>();
+ fields.put("ID", nn.normalizeFieldValue("ID", this.id));
+ fields.put("TITLE", lcdn.normalizeFieldValue("TITLE", this.title));
+ fields.put("TIMESTAMP", nn.normalizeFieldValue("TIMESTAMP", this.timestamp));
+ fields.put("COMMENTS", lcdn.normalizeFieldValue("COMMENTS", this.comments));
+ return fields;
+ }
+
+ }
+
+ public ArticleExtractor() {}
+
+ public Article extract(Reader reader) {
+ XMLInputFactory xmlif = XMLInputFactory.newInstance();
+ xmlif.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.TRUE);
+
+ XMLStreamReader xmlr = null;
+
+ try {
+ xmlr = xmlif.createXMLStreamReader(reader);
+ } catch (XMLStreamException e1) {
+ throw new RuntimeException(e1);
+ }
+
+ QName titleName = QName.valueOf("title");
+ QName textName = QName.valueOf("text");
+ QName revisionName = QName.valueOf("revision");
+ QName timestampName = QName.valueOf("timestamp");
+ QName commentName = QName.valueOf("comment");
+ QName idName = QName.valueOf("id");
+
+ Map<QName,StringBuilder> tags = new HashMap<QName,StringBuilder>();
+ for (QName tag : new QName[] {titleName, textName, timestampName, commentName, idName}) {
+ tags.put(tag, new StringBuilder());
+ }
+
+ StringBuilder articleText = tags.get(textName);
+ StringBuilder titleText = tags.get(titleName);
+ StringBuilder timestampText = tags.get(timestampName);
+ StringBuilder commentText = tags.get(commentName);
+ StringBuilder idText = tags.get(idName);
+
+ StringBuilder current = null;
+ boolean inRevision = false;
+ while (true) {
+ try {
+ if (!xmlr.hasNext())
+ break;
+ xmlr.next();
+ } catch (XMLStreamException e) {
+ throw new RuntimeException(e);
+ }
+ QName currentName = null;
+ if (xmlr.hasName()) {
+ currentName = xmlr.getName();
+ }
+ if (xmlr.isStartElement() && tags.containsKey(currentName)) {
+ if (!inRevision || (!currentName.equals(revisionName) && !currentName.equals(idName))) {
+ current = tags.get(currentName);
+ current.setLength(0);
+ }
+ } else if (xmlr.isStartElement() && currentName.equals(revisionName)) {
+ inRevision = true;
+ } else if (xmlr.isEndElement() && currentName.equals(revisionName)) {
+ inRevision = false;
+ } else if (xmlr.isEndElement() && current != null) {
+ if (textName.equals(currentName)) {
+
+ String title = titleText.toString();
+ String text = articleText.toString();
+ String comment = commentText.toString();
+ int id = Integer.parseInt(idText.toString());
+ long timestamp;
+ try {
+ timestamp = dateFormat.parse(timestampText.append("+0000").toString()).getTime();
+ return new Article(id, title, timestamp, comment, text);
+ } catch (ParseException e) {
+ return null;
+ }
+ }
+ current = null;
+ } else if (current != null && xmlr.hasText()) {
+ current.append(xmlr.getText());
+ }
+ }
+ return null;
+ }
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package ingest;
import java.io.IOException;
@@ -31,113 +31,120 @@ import org.apache.accumulo.core.client.C
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
-
public class WikipediaConfiguration {
- public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
- public final static String USER = "wikipedia.accumulo.user";
- public final static String PASSWORD = "wikipedia.accumulo.password";
- public final static String TABLE_NAME = "wikipedia.accumulo.table";
-
- public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
-
- public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
- public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
- public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
-
- public final static String ANALYZER = "wikipedia.index.analyzer";
-
- public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
-
- public static String getUser(Configuration conf) { return conf.get(USER); };
-
- public static byte[] getPassword(Configuration conf) {
- String pass = conf.get(PASSWORD);
- if (pass == null) {
- return null;
- }
- return pass.getBytes();
- }
-
- public static String getTableName(Configuration conf) {
- String tablename = conf.get(TABLE_NAME);
- if (tablename == null) {
- throw new RuntimeException("No data table name specified in " + TABLE_NAME);
- }
- return tablename;
- }
-
- public static String getInstanceName(Configuration conf) { return conf.get(INSTANCE_NAME); }
-
- public static String getZookeepers(Configuration conf) {
- String zookeepers = conf.get(ZOOKEEPERS);
- if (zookeepers == null) {
- throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
- }
- return zookeepers;
- }
-
- public static Path getNamespacesFile(Configuration conf) {
- String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
- return new Path(filename);
- }
- public static Path getLanguagesFile(Configuration conf) {
- String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
- return new Path(filename);
- }
- public static Path getWorkingDirectory(Configuration conf) {
- String filename = conf.get(WORKING_DIRECTORY);
- return new Path(filename);
- }
-
- public static Analyzer getAnalyzer(Configuration conf) throws IOException {
- Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
- return ReflectionUtils.newInstance(analyzerClass, conf);
- }
-
- public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
- return new Connector(getInstance(conf), getUser(conf), getPassword(conf));
- }
-
- public static Instance getInstance(Configuration conf) {
- return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
- }
-
- public static int getNumPartitions(Configuration conf) {
- return conf.getInt(NUM_PARTITIONS, 25);
- }
-
- /**
- * Helper method to get properties from Hadoop configuration
- * @param <T>
- * @param conf
- * @param propertyName
- * @param resultClass
- * @throws IllegalArgumentException if property is not defined, null, or empty. Or if resultClass is not handled.
- * @return value of property
- */
- @SuppressWarnings("unchecked")
- public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
- String p = conf.get(propertyName);
- if (StringUtils.isEmpty(p))
- throw new IllegalArgumentException(propertyName + " must be specified");
-
- if (resultClass.equals(String.class))
- return (T) p;
- else if (resultClass.equals(String[].class))
- return (T) conf.getStrings(propertyName);
- else if (resultClass.equals(Boolean.class))
- return (T) Boolean.valueOf(p);
- else if (resultClass.equals(Long.class))
- return (T) Long.valueOf(p);
- else if (resultClass.equals(Integer.class))
- return (T) Integer.valueOf(p);
- else if (resultClass.equals(Float.class))
- return (T) Float.valueOf(p);
- else if (resultClass.equals(Double.class))
- return (T) Double.valueOf(p);
- else
- throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
-
- }
-
+ public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
+ public final static String USER = "wikipedia.accumulo.user";
+ public final static String PASSWORD = "wikipedia.accumulo.password";
+ public final static String TABLE_NAME = "wikipedia.accumulo.table";
+
+ public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
+
+ public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
+ public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
+ public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
+
+ public final static String ANALYZER = "wikipedia.index.analyzer";
+
+ public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
+
+ public static String getUser(Configuration conf) {
+ return conf.get(USER);
+ };
+
+ public static byte[] getPassword(Configuration conf) {
+ String pass = conf.get(PASSWORD);
+ if (pass == null) {
+ return null;
+ }
+ return pass.getBytes();
+ }
+
+ public static String getTableName(Configuration conf) {
+ String tablename = conf.get(TABLE_NAME);
+ if (tablename == null) {
+ throw new RuntimeException("No data table name specified in " + TABLE_NAME);
+ }
+ return tablename;
+ }
+
+ public static String getInstanceName(Configuration conf) {
+ return conf.get(INSTANCE_NAME);
+ }
+
+ public static String getZookeepers(Configuration conf) {
+ String zookeepers = conf.get(ZOOKEEPERS);
+ if (zookeepers == null) {
+ throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
+ }
+ return zookeepers;
+ }
+
+ public static Path getNamespacesFile(Configuration conf) {
+ String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
+ return new Path(filename);
+ }
+
+ public static Path getLanguagesFile(Configuration conf) {
+ String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
+ return new Path(filename);
+ }
+
+ public static Path getWorkingDirectory(Configuration conf) {
+ String filename = conf.get(WORKING_DIRECTORY);
+ return new Path(filename);
+ }
+
+ public static Analyzer getAnalyzer(Configuration conf) throws IOException {
+ Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
+ return ReflectionUtils.newInstance(analyzerClass, conf);
+ }
+
+ public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
+ return new Connector(getInstance(conf), getUser(conf), getPassword(conf));
+ }
+
+ public static Instance getInstance(Configuration conf) {
+ return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
+ }
+
+ public static int getNumPartitions(Configuration conf) {
+ return conf.getInt(NUM_PARTITIONS, 25);
+ }
+
+ /**
+ * Helper method to get properties from Hadoop configuration
+ *
+ * @param <T>
+ * @param conf
+ * @param propertyName
+ * @param resultClass
+ * @throws IllegalArgumentException
+ * if property is not defined, null, or empty. Or if resultClass is not handled.
+ * @return value of property
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
+ String p = conf.get(propertyName);
+ if (StringUtils.isEmpty(p))
+ throw new IllegalArgumentException(propertyName + " must be specified");
+
+ if (resultClass.equals(String.class))
+ return (T) p;
+ else if (resultClass.equals(String[].class))
+ return (T) conf.getStrings(propertyName);
+ else if (resultClass.equals(Boolean.class))
+ return (T) Boolean.valueOf(p);
+ else if (resultClass.equals(Long.class))
+ return (T) Long.valueOf(p);
+ else if (resultClass.equals(Integer.class))
+ return (T) Integer.valueOf(p);
+ else if (resultClass.equals(Float.class))
+ return (T) Float.valueOf(p);
+ else if (resultClass.equals(Double.class))
+ return (T) Double.valueOf(p);
+ else
+ throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
+
+ }
+
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package ingest;
import java.io.IOException;
@@ -54,158 +54,156 @@ import org.apache.accumulo.core.iterator
import org.apache.accumulo.core.iterators.aggregation.conf.AggregatorConfiguration;
public class WikipediaIngester extends Configured implements Tool {
-
- public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
- public final static String SPLIT_FILE = "wikipedia.split_file";
- public final static String TABLE_NAME = "wikipedia.table";
-
-
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new WikipediaIngester(), args);
- System.exit(res);
- }
-
- private void createTables(TableOperations tops, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException {
- //Create the shard table
- String indexTableName = tableName + "Index";
- String reverseIndexTableName = tableName + "ReverseIndex";
- String metadataTableName = tableName + "Metadata";
-
- //create the shard table
- if (!tops.exists(tableName)) {
- // Set a text index aggregator on the given field names. No aggregator is set if the option is not supplied
- String textIndexFamilies = WikipediaMapper.TOKENS_FIELD_NAME;
-
- if (textIndexFamilies.length() > 0) {
- System.out.println("Adding content aggregator on the fields: " + textIndexFamilies);
-
- // Create and set the aggregators in one shot
- List<AggregatorConfiguration> aggregators = new ArrayList<AggregatorConfiguration>();
-
- for (String family : StringUtils.split(textIndexFamilies, ',')) {
- aggregators.add(new AggregatorConfiguration(new Text("fi\0" + family), aggregator.TextIndexAggregator.class.getName()));
- }
-
- tops.create(tableName);
- tops.addAggregators(tableName, aggregators);
- } else {
- tops.create(tableName);
- }
-
- // Set the locality group for the full content column family
- tops.setLocalityGroups(tableName,
- Collections.singletonMap("WikipediaDocuments",
- Collections.singleton(new Text(WikipediaMapper.DOCUMENT_COLUMN_FAMILY))));
-
- }
-
- if (!tops.exists(indexTableName)) {
- tops.create(indexTableName);
- //Add the UID aggregator
- for (IteratorScope scope : IteratorScope.values()) {
- String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator");
- tops.setProperty(indexTableName, stem, "19,iterator.TotalAggregatingIterator");
- stem += ".opt.";
- tops.setProperty(indexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator");
-
- }
- }
-
- if (!tops.exists(reverseIndexTableName)) {
- tops.create(reverseIndexTableName);
- //Add the UID aggregator
- for (IteratorScope scope : IteratorScope.values()) {
- String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator");
- tops.setProperty(reverseIndexTableName, stem, "19,iterator.TotalAggregatingIterator");
- stem += ".opt.";
- tops.setProperty(reverseIndexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator");
-
- }
- }
-
- if (!tops.exists(metadataTableName)) {
- //Add the NumSummation aggregator for the frequency column
- List<AggregatorConfiguration> aggregators = new ArrayList<AggregatorConfiguration>();
- aggregators.add(new AggregatorConfiguration(new Text("f"), NumSummation.class.getName()));
- tops.create(metadataTableName);
- tops.addAggregators(metadataTableName, aggregators);
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- Job job = new Job(getConf(), "Ingest Wikipedia");
- Configuration conf = job.getConfiguration();
-
-
- String tablename = WikipediaConfiguration.getTableName(conf);
-
- String zookeepers = WikipediaConfiguration.getZookeepers(conf);
- String instanceName = WikipediaConfiguration.getInstanceName(conf);
-
- String user = WikipediaConfiguration.getUser(conf);
- byte[] password = WikipediaConfiguration.getPassword(conf);
- Connector connector = WikipediaConfiguration.getConnector(conf);
-
- TableOperations tops = connector.tableOperations();
-
- createTables(tops, tablename);
-
- configureJob(job);
-
- List<Path> inputPaths = new ArrayList<Path>();
- SortedSet<String> languages = new TreeSet<String>();
- FileSystem fs = FileSystem.get(conf);
- Path parent = new Path(conf.get("wikipedia.input"));
- listFiles(parent, fs, inputPaths, languages);
-
- System.out.println("Input files in " + parent + ":" + inputPaths.size());
- Path[] inputPathsArray = new Path[inputPaths.size()];
- inputPaths.toArray(inputPathsArray);
-
- System.out.println("Languages:" + languages.size());
-
- FileInputFormat.setInputPaths(job, inputPathsArray);
-
- job.setMapperClass(WikipediaMapper.class);
- job.setNumReduceTasks(0);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Mutation.class);
- job.setOutputFormatClass(AccumuloOutputFormat.class);
- AccumuloOutputFormat.setOutputInfo(job, user, password, true, tablename);
- AccumuloOutputFormat.setZooKeeperInstance(job, instanceName, zookeepers);
-
- return job.waitForCompletion(true) ? 0 : 1;
- }
-
- public final static PathFilter partFilter = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().startsWith("part");
- };
- };
-
- protected void configureJob(Job job) {
- Configuration conf = job.getConfiguration();
- job.setJarByClass(WikipediaIngester.class);
- job.setInputFormatClass(WikipediaInputFormat.class);
- conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
- conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
- }
-
- protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
- protected void listFiles(Path path, FileSystem fs, List<Path> files, Set<String> languages) throws IOException {
- for (FileStatus status : fs.listStatus(path)) {
- if (status.isDir()) {
- listFiles(status.getPath(), fs, files, languages);
- } else {
- Path p = status.getPath();
- Matcher matcher = filePattern.matcher(p.getName());
- if (matcher.matches()) {
- languages.add(matcher.group(1));
- files.add(p);
- }
- }
- }
- }
+
+ public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
+ public final static String SPLIT_FILE = "wikipedia.split_file";
+ public final static String TABLE_NAME = "wikipedia.table";
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new WikipediaIngester(), args);
+ System.exit(res);
+ }
+
+ private void createTables(TableOperations tops, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
+ TableExistsException {
+ // Create the shard table
+ String indexTableName = tableName + "Index";
+ String reverseIndexTableName = tableName + "ReverseIndex";
+ String metadataTableName = tableName + "Metadata";
+
+ // create the shard table
+ if (!tops.exists(tableName)) {
+ // Set a text index aggregator on the given field names. No aggregator is set if the option is not supplied
+ String textIndexFamilies = WikipediaMapper.TOKENS_FIELD_NAME;
+
+ if (textIndexFamilies.length() > 0) {
+ System.out.println("Adding content aggregator on the fields: " + textIndexFamilies);
+
+ // Create and set the aggregators in one shot
+ List<AggregatorConfiguration> aggregators = new ArrayList<AggregatorConfiguration>();
+
+ for (String family : StringUtils.split(textIndexFamilies, ',')) {
+ aggregators.add(new AggregatorConfiguration(new Text("fi\0" + family), aggregator.TextIndexAggregator.class.getName()));
+ }
+
+ tops.create(tableName);
+ tops.addAggregators(tableName, aggregators);
+ } else {
+ tops.create(tableName);
+ }
+
+ // Set the locality group for the full content column family
+ tops.setLocalityGroups(tableName, Collections.singletonMap("WikipediaDocuments", Collections.singleton(new Text(WikipediaMapper.DOCUMENT_COLUMN_FAMILY))));
+
+ }
+
+ if (!tops.exists(indexTableName)) {
+ tops.create(indexTableName);
+ // Add the UID aggregator
+ for (IteratorScope scope : IteratorScope.values()) {
+ String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator");
+ tops.setProperty(indexTableName, stem, "19,iterator.TotalAggregatingIterator");
+ stem += ".opt.";
+ tops.setProperty(indexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator");
+
+ }
+ }
+
+ if (!tops.exists(reverseIndexTableName)) {
+ tops.create(reverseIndexTableName);
+ // Add the UID aggregator
+ for (IteratorScope scope : IteratorScope.values()) {
+ String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator");
+ tops.setProperty(reverseIndexTableName, stem, "19,iterator.TotalAggregatingIterator");
+ stem += ".opt.";
+ tops.setProperty(reverseIndexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator");
+
+ }
+ }
+
+ if (!tops.exists(metadataTableName)) {
+ // Add the NumSummation aggregator for the frequency column
+ List<AggregatorConfiguration> aggregators = new ArrayList<AggregatorConfiguration>();
+ aggregators.add(new AggregatorConfiguration(new Text("f"), NumSummation.class.getName()));
+ tops.create(metadataTableName);
+ tops.addAggregators(metadataTableName, aggregators);
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = new Job(getConf(), "Ingest Wikipedia");
+ Configuration conf = job.getConfiguration();
+
+ String tablename = WikipediaConfiguration.getTableName(conf);
+
+ String zookeepers = WikipediaConfiguration.getZookeepers(conf);
+ String instanceName = WikipediaConfiguration.getInstanceName(conf);
+
+ String user = WikipediaConfiguration.getUser(conf);
+ byte[] password = WikipediaConfiguration.getPassword(conf);
+ Connector connector = WikipediaConfiguration.getConnector(conf);
+
+ TableOperations tops = connector.tableOperations();
+
+ createTables(tops, tablename);
+
+ configureJob(job);
+
+ List<Path> inputPaths = new ArrayList<Path>();
+ SortedSet<String> languages = new TreeSet<String>();
+ FileSystem fs = FileSystem.get(conf);
+ Path parent = new Path(conf.get("wikipedia.input"));
+ listFiles(parent, fs, inputPaths, languages);
+
+ System.out.println("Input files in " + parent + ":" + inputPaths.size());
+ Path[] inputPathsArray = new Path[inputPaths.size()];
+ inputPaths.toArray(inputPathsArray);
+
+ System.out.println("Languages:" + languages.size());
+
+ FileInputFormat.setInputPaths(job, inputPathsArray);
+
+ job.setMapperClass(WikipediaMapper.class);
+ job.setNumReduceTasks(0);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Mutation.class);
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+ AccumuloOutputFormat.setOutputInfo(job, user, password, true, tablename);
+ AccumuloOutputFormat.setZooKeeperInstance(job, instanceName, zookeepers);
+
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ public final static PathFilter partFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith("part");
+ };
+ };
+
+ protected void configureJob(Job job) {
+ Configuration conf = job.getConfiguration();
+ job.setJarByClass(WikipediaIngester.class);
+ job.setInputFormatClass(WikipediaInputFormat.class);
+ conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
+ conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
+ }
+
+ protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
+
+ protected void listFiles(Path path, FileSystem fs, List<Path> files, Set<String> languages) throws IOException {
+ for (FileStatus status : fs.listStatus(path)) {
+ if (status.isDir()) {
+ listFiles(status.getPath(), fs, files, languages);
+ } else {
+ Path p = status.getPath();
+ Matcher matcher = filePattern.matcher(p.getName());
+ if (matcher.matches()) {
+ languages.add(matcher.group(1));
+ files.add(p);
+ }
+ }
+ }
+ }
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package ingest;
import org.apache.hadoop.fs.Path;
@@ -28,15 +28,15 @@ import org.apache.hadoop.mapreduce.lib.i
import reader.AggregatingRecordReader;
public class WikipediaInputFormat extends TextInputFormat {
-
- @Override
- public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
- return new AggregatingRecordReader();
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- return false;
- }
-
+
+ @Override
+ public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
+ return new AggregatingRecordReader();
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ return false;
+ }
+
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
/**
*
*/
@@ -65,200 +65,195 @@ import org.apache.accumulo.core.security
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-public class WikipediaMapper extends Mapper<LongWritable, Text, Text, Mutation> {
-
- private static final Logger log = Logger.getLogger(WikipediaMapper.class);
-
- public final static Charset UTF8 = Charset.forName("UTF-8");
- public static final String DOCUMENT_COLUMN_FAMILY = "d";
- public static final String METADATA_EVENT_COLUMN_FAMILY = "e";
- public static final String METADATA_INDEX_COLUMN_FAMILY = "i";
- public static final String TOKENS_FIELD_NAME = "TEXT";
-
- private final static Pattern languagePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
- private static final Value NULL_VALUE = new Value(new byte[0]);
- private static final String cvPrefix = "all|";
-
-
- private ArticleExtractor extractor;
- private String language;
- private int numPartitions = 0;
- private Set<?> stopwords = null;
- private ColumnVisibility cv = null;
-
-
- private Text tablename = null;
- private Text indexTableName = null;
- private Text reverseIndexTableName = null;
- private Text metadataTableName = null;
-
-
- @Override
- public void setup(Context context) {
- Configuration conf = context.getConfiguration();
- tablename = new Text(WikipediaConfiguration.getTableName(conf));
- indexTableName = new Text(tablename + "Index");
- reverseIndexTableName = new Text(tablename + "ReverseIndex");
- metadataTableName = new Text(tablename + "Metadata");
-
- FileSplit split = (FileSplit) context.getInputSplit();
- String fileName = split.getPath().getName();
- Matcher matcher = languagePattern.matcher(fileName);
- if (matcher.matches()) {
- language = matcher.group(1).replace('_', '-').toLowerCase();
- if (language.equals("arwiki"))
- stopwords = ArabicAnalyzer.getDefaultStopSet();
- else if (language.equals("brwiki"))
- stopwords = BrazilianAnalyzer.getDefaultStopSet();
- else if (language.startsWith("zh"))
- stopwords = CJKAnalyzer.getDefaultStopSet();
- else if (language.equals("dewiki"))
- stopwords = GermanAnalyzer.getDefaultStopSet();
- else if (language.equals("elwiki"))
- stopwords = GreekAnalyzer.getDefaultStopSet();
- else if (language.equals("fawiki"))
- stopwords = PersianAnalyzer.getDefaultStopSet();
- else if (language.equals("frwiki"))
- stopwords = FrenchAnalyzer.getDefaultStopSet();
- else if (language.equals("nlwiki"))
- stopwords = DutchAnalyzer.getDefaultStopSet();
- else
- stopwords = StopAnalyzer.ENGLISH_STOP_WORDS_SET;
-
- } else {
- throw new RuntimeException("Unknown ingest language! " + fileName);
- }
- extractor = new ArticleExtractor();
- numPartitions = WikipediaConfiguration.getNumPartitions(conf);
- cv = new ColumnVisibility(cvPrefix + language);
-
- }
-
- /**
- * We will partition the documents based on the document id
- * @param article
- * @param numPartitions
- * @return
- * @throws IllegalFormatException
- */
- public static int getPartitionId(Article article, int numPartitions) throws IllegalFormatException {
- return article.getId() % numPartitions;
- }
-
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8));
- String NULL_BYTE = "\u0000";
- String colfPrefix = language+NULL_BYTE;
- String indexPrefix = "fi"+NULL_BYTE;
- if (article != null) {
- Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions)));
-
- //Create the mutations for the document.
- //Row is partition id, colf is language0articleid, colq is fieldName\0fieldValue
- Mutation m = new Mutation(partitionId);
- for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
- m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp() , NULL_VALUE);
- //Create mutations for the metadata table.
- Mutation mm = new Mutation(entry.getKey());
- mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
- context.write(metadataTableName, mm);
- }
-
- //Tokenize the content
- Set<String> tokens = getTokens(article);
-
- //We are going to put the fields to be indexed into a multimap. This allows us to iterate
- //over the entire set once.
- Multimap<String,String> indexFields = HashMultimap.create();
- //Add the normalized field values
- LcNoDiacriticsNormalizer normalizer = new LcNoDiacriticsNormalizer();
- for (Entry<String,String> index : article.getNormalizedFieldValues().entrySet())
- indexFields.put(index.getKey(), index.getValue());
- //Add the tokens
- for (String token : tokens)
- indexFields.put(TOKENS_FIELD_NAME, normalizer.normalizeFieldValue("", token));
-
-
- for (Entry<String,String> index : indexFields.entries()) {
- //Create mutations for the in partition index
- //Row is partition id, colf is 'fi'\0fieldName, colq is fieldValue\0language\0article id
- m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(), cv, article.getTimestamp() , NULL_VALUE);
-
-
- //Create mutations for the global index
- //Create a UID object for the Value
- Builder uidBuilder = Uid.List.newBuilder();
- uidBuilder.setIGNORE(false);
- uidBuilder.setCOUNT(1);
- uidBuilder.addUID(Integer.toString(article.getId()));
- Uid.List uidList = uidBuilder.build();
- Value val = new Value(uidList.toByteArray());
-
- //Create mutations for the global index
- //Row is field value, colf is field name, colq is partitionid\0language, value is Uid.List object
- Mutation gm = new Mutation(index.getValue());
- gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp() , val);
- context.write(indexTableName, gm);
-
- //Create mutations for the global reverse index
- Mutation grm = new Mutation(StringUtils.reverse(index.getValue()));
- grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp() , val);
- context.write(reverseIndexTableName, grm);
-
- //Create mutations for the metadata table.
- Mutation mm = new Mutation(index.getKey());
- mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE);
- context.write(metadataTableName, mm);
-
-
- }
- //Add the entire text to the document section of the table.
- //row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document
- m.put(DOCUMENT_COLUMN_FAMILY, colfPrefix + article.getId(), cv, article.getTimestamp(),
- new Value(Base64.encodeBytes(article.getText().getBytes(), Base64.GZIP).getBytes()));
- context.write(tablename, m);
-
- } else {
- context.getCounter("wikipedia", "invalid articles").increment(1);
- }
- context.progress();
- }
-
- /**
- * Tokenize the wikipedia content
- *
- * @param article
- * @return
- * @throws IOException
- */
- private Set<String> getTokens(Article article) throws IOException {
- Set<String> tokenList = new HashSet<String>();
- WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText()));
- TermAttribute term = tok.addAttribute(TermAttribute.class);
- StopFilter filter = new StopFilter(false, tok, stopwords, true);
- try {
- while (filter.incrementToken()) {
- String token = term.term();
- if (!StringUtils.isEmpty(token))
- tokenList.add(token);
- }
- } catch (IOException e) {
- log.error("Error tokenizing text", e);
- } finally {
- try {
- tok.end();
- } catch (IOException e) {
- log.error("Error calling end()", e);
- } finally {
- try {
- tok.close();
- } catch (IOException e) {
- log.error("Error closing tokenizer", e);
- }
- }
- }
- return tokenList;
- }
-
+public class WikipediaMapper extends Mapper<LongWritable,Text,Text,Mutation> {
+
+ private static final Logger log = Logger.getLogger(WikipediaMapper.class);
+
+ public final static Charset UTF8 = Charset.forName("UTF-8");
+ public static final String DOCUMENT_COLUMN_FAMILY = "d";
+ public static final String METADATA_EVENT_COLUMN_FAMILY = "e";
+ public static final String METADATA_INDEX_COLUMN_FAMILY = "i";
+ public static final String TOKENS_FIELD_NAME = "TEXT";
+
+ private final static Pattern languagePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
+ private static final Value NULL_VALUE = new Value(new byte[0]);
+ private static final String cvPrefix = "all|";
+
+ private ArticleExtractor extractor;
+ private String language;
+ private int numPartitions = 0;
+ private Set<?> stopwords = null;
+ private ColumnVisibility cv = null;
+
+ private Text tablename = null;
+ private Text indexTableName = null;
+ private Text reverseIndexTableName = null;
+ private Text metadataTableName = null;
+
+ @Override
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ tablename = new Text(WikipediaConfiguration.getTableName(conf));
+ indexTableName = new Text(tablename + "Index");
+ reverseIndexTableName = new Text(tablename + "ReverseIndex");
+ metadataTableName = new Text(tablename + "Metadata");
+
+ FileSplit split = (FileSplit) context.getInputSplit();
+ String fileName = split.getPath().getName();
+ Matcher matcher = languagePattern.matcher(fileName);
+ if (matcher.matches()) {
+ language = matcher.group(1).replace('_', '-').toLowerCase();
+ if (language.equals("arwiki"))
+ stopwords = ArabicAnalyzer.getDefaultStopSet();
+ else if (language.equals("brwiki"))
+ stopwords = BrazilianAnalyzer.getDefaultStopSet();
+ else if (language.startsWith("zh"))
+ stopwords = CJKAnalyzer.getDefaultStopSet();
+ else if (language.equals("dewiki"))
+ stopwords = GermanAnalyzer.getDefaultStopSet();
+ else if (language.equals("elwiki"))
+ stopwords = GreekAnalyzer.getDefaultStopSet();
+ else if (language.equals("fawiki"))
+ stopwords = PersianAnalyzer.getDefaultStopSet();
+ else if (language.equals("frwiki"))
+ stopwords = FrenchAnalyzer.getDefaultStopSet();
+ else if (language.equals("nlwiki"))
+ stopwords = DutchAnalyzer.getDefaultStopSet();
+ else
+ stopwords = StopAnalyzer.ENGLISH_STOP_WORDS_SET;
+
+ } else {
+ throw new RuntimeException("Unknown ingest language! " + fileName);
+ }
+ extractor = new ArticleExtractor();
+ numPartitions = WikipediaConfiguration.getNumPartitions(conf);
+ cv = new ColumnVisibility(cvPrefix + language);
+
+ }
+
+ /**
+ * We will partition the documents based on the document id
+ *
+ * @param article
+ * @param numPartitions
+ * @return
+ * @throws IllegalFormatException
+ */
+ public static int getPartitionId(Article article, int numPartitions) throws IllegalFormatException {
+ return article.getId() % numPartitions;
+ }
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8));
+ String NULL_BYTE = "\u0000";
+ String colfPrefix = language + NULL_BYTE;
+ String indexPrefix = "fi" + NULL_BYTE;
+ if (article != null) {
+ Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions)));
+
+ // Create the mutations for the document.
+ // Row is partition id, colf is language0articleid, colq is fieldName\0fieldValue
+ Mutation m = new Mutation(partitionId);
+ for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
+ m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE);
+ // Create mutations for the metadata table.
+ Mutation mm = new Mutation(entry.getKey());
+ mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
+ context.write(metadataTableName, mm);
+ }
+
+ // Tokenize the content
+ Set<String> tokens = getTokens(article);
+
+ // We are going to put the fields to be indexed into a multimap. This allows us to iterate
+ // over the entire set once.
+ Multimap<String,String> indexFields = HashMultimap.create();
+ // Add the normalized field values
+ LcNoDiacriticsNormalizer normalizer = new LcNoDiacriticsNormalizer();
+ for (Entry<String,String> index : article.getNormalizedFieldValues().entrySet())
+ indexFields.put(index.getKey(), index.getValue());
+ // Add the tokens
+ for (String token : tokens)
+ indexFields.put(TOKENS_FIELD_NAME, normalizer.normalizeFieldValue("", token));
+
+ for (Entry<String,String> index : indexFields.entries()) {
+ // Create mutations for the in partition index
+ // Row is partition id, colf is 'fi'\0fieldName, colq is fieldValue\0language\0article id
+ m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(), cv, article.getTimestamp(), NULL_VALUE);
+
+ // Create mutations for the global index
+ // Create a UID object for the Value
+ Builder uidBuilder = Uid.List.newBuilder();
+ uidBuilder.setIGNORE(false);
+ uidBuilder.setCOUNT(1);
+ uidBuilder.addUID(Integer.toString(article.getId()));
+ Uid.List uidList = uidBuilder.build();
+ Value val = new Value(uidList.toByteArray());
+
+ // Create mutations for the global index
+ // Row is field value, colf is field name, colq is partitionid\0language, value is Uid.List object
+ Mutation gm = new Mutation(index.getValue());
+ gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val);
+ context.write(indexTableName, gm);
+
+ // Create mutations for the global reverse index
+ Mutation grm = new Mutation(StringUtils.reverse(index.getValue()));
+ grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val);
+ context.write(reverseIndexTableName, grm);
+
+ // Create mutations for the metadata table.
+ Mutation mm = new Mutation(index.getKey());
+ mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE);
+ context.write(metadataTableName, mm);
+
+ }
+ // Add the entire text to the document section of the table.
+ // row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document
+ m.put(DOCUMENT_COLUMN_FAMILY, colfPrefix + article.getId(), cv, article.getTimestamp(),
+ new Value(Base64.encodeBytes(article.getText().getBytes(), Base64.GZIP).getBytes()));
+ context.write(tablename, m);
+
+ } else {
+ context.getCounter("wikipedia", "invalid articles").increment(1);
+ }
+ context.progress();
+ }
+
+ /**
+ * Tokenize the wikipedia content
+ *
+ * @param article
+ * @return
+ * @throws IOException
+ */
+ private Set<String> getTokens(Article article) throws IOException {
+ Set<String> tokenList = new HashSet<String>();
+ WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText()));
+ TermAttribute term = tok.addAttribute(TermAttribute.class);
+ StopFilter filter = new StopFilter(false, tok, stopwords, true);
+ try {
+ while (filter.incrementToken()) {
+ String token = term.term();
+ if (!StringUtils.isEmpty(token))
+ tokenList.add(token);
+ }
+ } catch (IOException e) {
+ log.error("Error tokenizing text", e);
+ } finally {
+ try {
+ tok.end();
+ } catch (IOException e) {
+ log.error("Error calling end()", e);
+ } finally {
+ try {
+ tok.close();
+ } catch (IOException e) {
+ log.error("Error closing tokenizer", e);
+ }
+ }
+ }
+ return tokenList;
+ }
+
}