You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by uj...@apache.org on 2014/03/19 17:08:19 UTC

[09/50] [abbrv] git commit: ACCUMULO-375 hybridized ingest to use some bulk and some streaming

ACCUMULO-375 hybridized ingest to use some bulk and some streaming

git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1245142 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/ec56d2d4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/ec56d2d4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/ec56d2d4

Branch: refs/heads/master
Commit: ec56d2d429ebf4bd849daf71804021536f4d21ee
Parents: 0e1e67d
Author: Adam Fuchs <af...@apache.org>
Authored: Thu Feb 16 19:54:31 2012 +0000
Committer: Adam Fuchs <af...@apache.org>
Committed: Thu Feb 16 19:54:31 2012 +0000

----------------------------------------------------------------------
 .../ingest/WikipediaPartitionedMapper.java        | 18 +++++++++++++++---
 .../output/BufferingRFileRecordWriter.java        |  9 +++++----
 2 files changed, 20 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ec56d2d4/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
index 25bf572..7816b03 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
@@ -26,6 +26,9 @@ import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
@@ -112,6 +115,7 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio
     }
   }
   
+  MultiTableBatchWriter mtbw;
 
   @Override
   public void setup(final Context context) {
@@ -121,6 +125,14 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio
     reverseIndexTableName = new Text(tablename + "ReverseIndex");
     metadataTableName = new Text(tablename + "Metadata");
     
+    try {
+      mtbw = WikipediaConfiguration.getConnector(conf).createMultiTableBatchWriter(10000000, 1000, 10);
+    } catch (AccumuloException e) {
+      throw new RuntimeException(e);
+    } catch (AccumuloSecurityException e) {
+      throw new RuntimeException(e);
+    }
+    
     final Text metadataTableNameFinal = metadataTableName;
     final Text indexTableNameFinal = indexTableName;
     final Text reverseIndexTableNameFinal = reverseIndexTableName;
@@ -163,7 +175,7 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio
           Mutation m = new Mutation(key.row);
           m.put(key.colfam, key.colqual, key.cv, key.timestamp, val);
           try {
-            context.write(indexTableNameFinal, m);
+            mtbw.getBatchWriter(indexTableNameFinal.toString()).addMutation(m);
           } catch (Exception e) {
             throw new RuntimeException(e);
           }
@@ -189,7 +201,7 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio
           Mutation m = new Mutation(key.row);
           m.put(key.colfam, key.colqual, key.cv, key.timestamp, val);
           try {
-            context.write(reverseIndexTableNameFinal, m);
+            mtbw.getBatchWriter(reverseIndexTableNameFinal.toString()).addMutation(m);
           } catch (Exception e) {
             throw new RuntimeException(e);
           }
@@ -210,7 +222,7 @@ public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutatio
             Mutation m = new Mutation(key.row);
             m.put(key.colfam, key.colqual, key.cv, key.timestamp, value);
             try {
-              context.write(metadataTableNameFinal, m);
+              mtbw.getBatchWriter(metadataTableNameFinal.toString()).addMutation(m);
             } catch (Exception e) {
               throw new RuntimeException(e);
             }

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/ec56d2d4/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
index a7e7dcf..579bbe1 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
@@ -69,8 +69,8 @@ final class BufferingRFileRecordWriter extends RecordWriter<Text,Mutation> {
     if (buffer.size() == 0)
       return;
     
-    // TODO fix the filename
     String file = filenamePrefix + "/" + tablename + "/" + taskID + "_" + (fileCount++) + ".rf";
+    // TODO get the table configuration for the given table?
     FileSKVWriter writer = RFileOperations.getInstance().openWriter(file, fs, conf, acuconf);
     
     // forget locality groups for now, just write everything to the default
@@ -110,17 +110,18 @@ final class BufferingRFileRecordWriter extends RecordWriter<Text,Mutation> {
     {
       Key k = new Key(mutation.getRow(),update.getColumnFamily(),update.getColumnQualifier(),update.getColumnVisibility(),update.getTimestamp(),update.isDeleted());
       Value v = new Value(update.getValue());
+      // TODO account for object overhead
       mutationSize += k.getSize();
       mutationSize += v.getSize();
       buffer.put(k, v);
     }
     size += mutationSize;
     long bufferSize = bufferSizes.get(table);
+    
+    // TODO use a MutableLong instead
     bufferSize += mutationSize;
     bufferSizes.put(table, bufferSize);
-    
-    // TODO add object overhead size
-    
+
     while (size >= maxSize) {
       flushLargestTable();
     }