You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2012/03/20 18:49:29 UTC
svn commit: r1303044 - in /incubator/accumulo/trunk: ./ examples/simple/
examples/wikisearch/
examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/
examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples...
Author: billie
Date: Tue Mar 20 17:49:28 2012
New Revision: 1303044
URL: http://svn.apache.org/viewvc?rev=1303044&view=rev
Log:
ACCUMULO-469 ACCUMULO-471 ACCUMULO-472 merged to trunk
Modified:
incubator/accumulo/trunk/ (props changed)
incubator/accumulo/trunk/examples/simple/pom.xml
incubator/accumulo/trunk/examples/wikisearch/README
incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java
incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
incubator/accumulo/trunk/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputSplitTest.java
Propchange: incubator/accumulo/trunk/
------------------------------------------------------------------------------
Merged /incubator/accumulo/branches/1.4:r1300182-1300276,1300278-1300712,1300714-1301055,1301057-1301470,1301472-1302467,1302469-1302913,1302915-1303019,1303021-1303024
Merged /incubator/accumulo/branches/1.4/src:r1302470-1302913
Modified: incubator/accumulo/trunk/examples/simple/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/examples/simple/pom.xml?rev=1303044&r1=1303043&r2=1303044&view=diff
==============================================================================
--- incubator/accumulo/trunk/examples/simple/pom.xml (original)
+++ incubator/accumulo/trunk/examples/simple/pom.xml Tue Mar 20 17:49:28 2012
@@ -61,6 +61,12 @@
</includes>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <configuration>
+ <outputDirectory>../../lib</outputDirectory>
+ </configuration>
+ </plugin>
</plugins>
</build>
Modified: incubator/accumulo/trunk/examples/wikisearch/README
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/examples/wikisearch/README?rev=1303044&r1=1303043&r2=1303044&view=diff
==============================================================================
--- incubator/accumulo/trunk/examples/wikisearch/README (original)
+++ incubator/accumulo/trunk/examples/wikisearch/README Tue Mar 20 17:49:28 2012
@@ -11,7 +11,10 @@
1. Accumulo, Hadoop, and ZooKeeper must be installed and running
2. One or more wikipedia dump files (http://dumps.wikimedia.org/backup-index.html) placed in an HDFS directory.
You will want to grab the files with the link name of pages-articles.xml.bz2
-
+ 3. Though not strictly required, the ingest will go more quickly if the files are decompressed:
+
+ $ bunzip2 < enwiki-*-pages-articles.xml.bz2 | hadoop fs -put - /wikipedia/enwiki-pages-articles.xml
+
INSTRUCTIONS
------------
@@ -70,4 +73,4 @@
log4j.logger.org.apache.accumulo.examples.wikisearch.iterator=INFO,A1
This needs to be propagated to all the tablet server nodes, and accumulo needs to be restarted.
-
\ No newline at end of file
+
Modified: incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java?rev=1303044&r1=1303043&r2=1303044&view=diff
==============================================================================
--- incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java (original)
+++ incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java Tue Mar 20 17:49:28 2012
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.accumulo.examples.wikisearch.ingest;
import java.util.LinkedHashMap;
@@ -6,14 +22,12 @@ import java.util.Map;
public class LRUOutputCombiner<Key,Value> extends LinkedHashMap<Key,Value> {
private static final long serialVersionUID = 1L;
-
- public static abstract class Fold <Value>
- {
+
+ public static abstract class Fold<Value> {
public abstract Value fold(Value oldValue, Value newValue);
}
- public static abstract class Output<Key,Value>
- {
+ public static abstract class Output<Key,Value> {
public abstract void output(Key key, Value value);
}
Modified: incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java?rev=1303044&r1=1303043&r2=1303044&view=diff
==============================================================================
--- incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java (original)
+++ incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java Tue Mar 20 17:49:28 2012
@@ -133,10 +133,4 @@ public class WikipediaInputFormat extend
public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new AggregatingRecordReader();
}
-
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- return false;
- }
-
}
Modified: incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java?rev=1303044&r1=1303043&r2=1303044&view=diff
==============================================================================
--- incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java (original)
+++ incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java Tue Mar 20 17:49:28 2012
@@ -119,6 +119,8 @@ public class WikipediaMapper extends Map
return article.getId() % numPartitions;
}
+ static HashSet<String> metadataSent = new HashSet<String>();
+
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8));
@@ -137,9 +139,13 @@ public class WikipediaMapper extends Map
for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE);
// Create mutations for the metadata table.
- Mutation mm = new Mutation(entry.getKey());
- mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
- context.write(metadataTableName, mm);
+ String metadataKey = entry.getKey() + METADATA_EVENT_COLUMN_FAMILY + language;
+ if (!metadataSent.contains(metadataKey)) {
+ Mutation mm = new Mutation(entry.getKey());
+ mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
+ context.write(metadataTableName, mm);
+ metadataSent.add(metadataKey);
+ }
}
// Tokenize the content
@@ -182,10 +188,13 @@ public class WikipediaMapper extends Map
context.write(reverseIndexTableName, grm);
// Create mutations for the metadata table.
- Mutation mm = new Mutation(index.getKey());
- mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE);
- context.write(metadataTableName, mm);
-
+ String metadataKey = index.getKey() + METADATA_INDEX_COLUMN_FAMILY + language;
+ if (!metadataSent.contains(metadataKey)) {
+ Mutation mm = new Mutation(index.getKey());
+ mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE);
+ context.write(metadataTableName, mm);
+ metadataSent.add(metadataKey);
+ }
}
// Add the entire text to the document section of the table.
// row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document
Modified: incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java?rev=1303044&r1=1303043&r2=1303044&view=diff
==============================================================================
--- incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java (original)
+++ incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java Tue Mar 20 17:49:28 2012
@@ -1,10 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.accumulo.examples.wikisearch.output;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.TreeMap;
import java.util.Map.Entry;
+import java.util.TreeMap;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ColumnUpdate;
@@ -31,12 +47,10 @@ final class BufferingRFileRecordWriter e
private Map<Text,TreeMap<Key,Value>> buffers = new HashMap<Text,TreeMap<Key,Value>>();
private Map<Text,Long> bufferSizes = new HashMap<Text,Long>();
-
- private TreeMap<Key,Value> getBuffer(Text tablename)
- {
+
+ private TreeMap<Key,Value> getBuffer(Text tablename) {
TreeMap<Key,Value> buffer = buffers.get(tablename);
- if(buffer == null)
- {
+ if (buffer == null) {
buffer = new TreeMap<Key,Value>();
buffers.put(tablename, buffer);
bufferSizes.put(tablename, 0l);
@@ -44,14 +58,11 @@ final class BufferingRFileRecordWriter e
return buffer;
}
- private Text getLargestTablename()
- {
+ private Text getLargestTablename() {
long max = 0;
Text table = null;
- for(Entry<Text,Long> e:bufferSizes.entrySet())
- {
- if(e.getValue() > max)
- {
+ for (Entry<Text,Long> e : bufferSizes.entrySet()) {
+ if (e.getValue() > max) {
max = e.getValue();
table = e.getKey();
}
@@ -59,10 +70,9 @@ final class BufferingRFileRecordWriter e
return table;
}
- private void flushLargestTable() throws IOException
- {
+ private void flushLargestTable() throws IOException {
Text tablename = getLargestTablename();
- if(tablename == null)
+ if (tablename == null)
return;
long bufferSize = bufferSizes.get(tablename);
TreeMap<Key,Value> buffer = buffers.get(tablename);
@@ -98,7 +108,7 @@ final class BufferingRFileRecordWriter e
@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
- while(size > 0)
+ while (size > 0)
flushLargestTable();
}
@@ -106,9 +116,9 @@ final class BufferingRFileRecordWriter e
public void write(Text table, Mutation mutation) throws IOException, InterruptedException {
TreeMap<Key,Value> buffer = getBuffer(table);
int mutationSize = 0;
- for(ColumnUpdate update: mutation.getUpdates())
- {
- Key k = new Key(mutation.getRow(),update.getColumnFamily(),update.getColumnQualifier(),update.getColumnVisibility(),update.getTimestamp(),update.isDeleted());
+ for (ColumnUpdate update : mutation.getUpdates()) {
+ Key k = new Key(mutation.getRow(), update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(),
+ update.isDeleted());
Value v = new Value(update.getValue());
// TODO account for object overhead
mutationSize += k.getSize();
@@ -121,7 +131,7 @@ final class BufferingRFileRecordWriter e
// TODO use a MutableLong instead
bufferSize += mutationSize;
bufferSizes.put(table, bufferSize);
-
+
while (size >= maxSize) {
flushLargestTable();
}
Modified: incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java?rev=1303044&r1=1303043&r2=1303044&view=diff
==============================================================================
--- incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java (original)
+++ incubator/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java Tue Mar 20 17:49:28 2012
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.accumulo.examples.wikisearch.output;
import java.io.IOException;
@@ -14,9 +30,9 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> {
-
+
// private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class);
-
+
public static final String PATH_NAME = "sortingrfileoutputformat.path";
public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";
Modified: incubator/accumulo/trunk/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputSplitTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputSplitTest.java?rev=1303044&r1=1303043&r2=1303044&view=diff
==============================================================================
--- incubator/accumulo/trunk/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputSplitTest.java (original)
+++ incubator/accumulo/trunk/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputSplitTest.java Tue Mar 20 17:49:28 2012
@@ -1,9 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.accumulo.examples.wikisearch.ingest;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -17,20 +32,19 @@ import org.junit.Test;
public class WikipediaInputSplitTest {
@Test
- public void testSerialization() throws IOException
- {
+ public void testSerialization() throws IOException {
Path testPath = new Path("/foo/bar");
- String [] hosts = new String [2];
+ String[] hosts = new String[2];
hosts[0] = "abcd";
hosts[1] = "efgh";
- FileSplit fSplit = new FileSplit(testPath,1,2,hosts);
- WikipediaInputSplit split = new WikipediaInputSplit(fSplit,7);
+ FileSplit fSplit = new FileSplit(testPath, 1, 2, hosts);
+ WikipediaInputSplit split = new WikipediaInputSplit(fSplit, 7);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
split.write(out);
- out.close();
+ out.close();
baos.close();
-
+
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInput in = new ObjectInputStream(bais);
@@ -45,12 +59,11 @@ public class WikipediaInputSplitTest {
Assert.assertTrue(fSplit.getPath().equals(fSplit2.getPath()));
Assert.assertTrue(fSplit.getStart() == fSplit2.getStart());
Assert.assertTrue(fSplit.getLength() == fSplit2.getLength());
-
- String [] hosts2 = fSplit2.getLocations();
+
+ String[] hosts2 = fSplit2.getLocations();
Assert.assertEquals(hosts.length, hosts2.length);
- for(int i = 0; i < hosts.length; i++)
- {
- Assert.assertEquals(hosts[i],hosts2[i]);
+ for (int i = 0; i < hosts.length; i++) {
+ Assert.assertEquals(hosts[i], hosts2[i]);
}
}
}