You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/03/19 17:47:42 UTC
svn commit: r1302537 - in
/incubator/accumulo/branches/1.4/src/examples/wikisearch: ./
ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/
Author: ecn
Date: Mon Mar 19 16:47:41 2012
New Revision: 1302537
URL: http://svn.apache.org/viewvc?rev=1302537&view=rev
Log:
ACCUMULO-471 document the ability to run run over uncompressed data; allow the input to be split, don't send millions of duplicate metadata table entries
Modified:
incubator/accumulo/branches/1.4/src/examples/wikisearch/README
incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/README
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/README?rev=1302537&r1=1302536&r2=1302537&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/README (original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/README Mon Mar 19 16:47:41 2012
@@ -11,7 +11,10 @@
1. Accumulo, Hadoop, and ZooKeeper must be installed and running
2. One or more wikipedia dump files (http://dumps.wikimedia.org/backup-index.html) placed in an HDFS directory.
You will want to grab the files with the link name of pages-articles.xml.bz2
-
+ 3. Though not strictly required, the ingest will go more quickly if the files are decompressed:
+
+ $ bunzip2 < enwiki-*-pages-articles.xml.bz2 | hadoop fs -put - /wikipedia/enwiki-pages-articles.xml
+
INSTRUCTIONS
------------
@@ -70,4 +73,4 @@
log4j.logger.org.apache.accumulo.examples.wikisearch.iterator=INFO,A1
This needs to be propagated to all the tablet server nodes, and accumulo needs to be restarted.
-
\ No newline at end of file
+
Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java?rev=1302537&r1=1302536&r2=1302537&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java (original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java Mon Mar 19 16:47:41 2012
@@ -133,10 +133,4 @@ public class WikipediaInputFormat extend
public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new AggregatingRecordReader();
}
-
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- return false;
- }
-
}
Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java?rev=1302537&r1=1302536&r2=1302537&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java (original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java Mon Mar 19 16:47:41 2012
@@ -119,6 +119,8 @@ public class WikipediaMapper extends Map
return article.getId() % numPartitions;
}
+ static HashSet<String> metadataSent = new HashSet<String>();
+
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8));
@@ -137,9 +139,13 @@ public class WikipediaMapper extends Map
for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE);
// Create mutations for the metadata table.
- Mutation mm = new Mutation(entry.getKey());
- mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
- context.write(metadataTableName, mm);
+ String metadataKey = entry.getKey() + METADATA_EVENT_COLUMN_FAMILY + language;
+ if (!metadataSent.contains(metadataKey)) {
+ Mutation mm = new Mutation(entry.getKey());
+ mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
+ context.write(metadataTableName, mm);
+ metadataSent.add(metadataKey);
+ }
}
// Tokenize the content
@@ -182,10 +188,13 @@ public class WikipediaMapper extends Map
context.write(reverseIndexTableName, grm);
// Create mutations for the metadata table.
- Mutation mm = new Mutation(index.getKey());
- mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE);
- context.write(metadataTableName, mm);
-
+ String metadataKey = index.getKey() + METADATA_INDEX_COLUMN_FAMILY + language;
+ if (!metadataSent.contains(metadataKey)) {
+ Mutation mm = new Mutation(index.getKey());
+ mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE);
+ context.write(metadataTableName, mm);
+ metadataSent.add(metadataKey);
+ }
}
// Add the entire text to the document section of the table.
// row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document