You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by uj...@apache.org on 2014/03/19 17:08:17 UTC

[07/50] [abbrv] git commit: ACCUMULO-381 added a bulk ingest option for wikisearch ingest

ACCUMULO-381 added a bulk ingest option for wikisearch ingest

git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1243506 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/b4f30879
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/b4f30879
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/b4f30879

Branch: refs/heads/master
Commit: b4f30879d903e29a240680f33db7b716e9ac7145
Parents: 72b7221
Author: Adam Fuchs <af...@apache.org>
Authored: Mon Feb 13 13:49:12 2012 +0000
Committer: Adam Fuchs <af...@apache.org>
Committed: Mon Feb 13 13:49:12 2012 +0000

----------------------------------------------------------------------
 ingest/conf/wikipedia_parallel.xml.example      |  16 +++
 .../ingest/WikipediaConfiguration.java          |  22 +++-
 .../ingest/WikipediaPartitionedIngester.java    |  56 ++++++--
 .../output/BufferingRFileRecordWriter.java      | 129 +++++++++++++++++++
 .../output/SortingRFileOutputFormat.java        | 103 +++++++++++++++
 5 files changed, 314 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/b4f30879/ingest/conf/wikipedia_parallel.xml.example
----------------------------------------------------------------------
diff --git a/ingest/conf/wikipedia_parallel.xml.example b/ingest/conf/wikipedia_parallel.xml.example
index cf20f01..53220f0 100644
--- a/ingest/conf/wikipedia_parallel.xml.example
+++ b/ingest/conf/wikipedia_parallel.xml.example
@@ -56,4 +56,20 @@
     <name>wikipedia.run.ingest</name>
     <value><!--whether to run the ingest step --></value>
   </property>
+  <property>
+    <name>wikipedia.bulk.ingest</name>
+    <value><!--whether to use bulk ingest vice streaming ingest --></value>
+  </property>
+  <property>
+    <name>wikipedia.bulk.ingest.dir</name>
+    <value><!--the directory to store rfiles for bulk ingest --></value>
+  </property>
+  <property>
+    <name>wikipedia.bulk.ingest.failure.dir</name>
+    <value><!--the directory to store failed rfiles after bulk ingest --></value>
+  </property>
+  <property>
+    <name>wikipedia.bulk.ingest.buffer.size</name>
+    <value><!--the ammount of memory to use for buffering and sorting key/value pairs in each mapper before writing rfiles --></value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/b4f30879/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
index 5a0aad4..a84d90c 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
@@ -52,6 +52,10 @@ public class WikipediaConfiguration {
   
   public final static String RUN_PARTITIONER = "wikipedia.run.partitioner";
   public final static String RUN_INGEST = "wikipedia.run.ingest";
+  public final static String BULK_INGEST = "wikipedia.bulk.ingest";
+  public final static String BULK_INGEST_DIR = "wikipedia.bulk.ingest.dir";
+  public final static String BULK_INGEST_FAILURE_DIR = "wikipedia.bulk.ingest.failure.dir";
+  public final static String BULK_INGEST_BUFFER_SIZE = "wikipedia.bulk.ingest.buffer.size";
   
   
   public static String getUser(Configuration conf) {
@@ -134,6 +138,22 @@ public class WikipediaConfiguration {
     return conf.getBoolean(RUN_INGEST, true);
   }
 
+  public static boolean bulkIngest(Configuration conf) {
+    return conf.getBoolean(BULK_INGEST, true);
+  }
+
+  public static String bulkIngestDir(Configuration conf) {
+    return conf.get(BULK_INGEST_DIR);
+  }
+
+  public static String bulkIngestFailureDir(Configuration conf) {
+    return conf.get(BULK_INGEST_FAILURE_DIR);
+  }
+  
+  public static long bulkIngestBufferSize(Configuration conf) {
+    return conf.getLong(BULK_INGEST_BUFFER_SIZE,1l<<28);
+  }
+
   /**
    * Helper method to get properties from Hadoop configuration
    * 
@@ -169,5 +189,5 @@ public class WikipediaConfiguration {
       throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
     
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/b4f30879/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
index 5571290..ca9af6a 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
@@ -42,6 +42,7 @@ import org.apache.accumulo.core.iterators.user.SummingCombiner;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
 import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
 import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;
+import org.apache.accumulo.examples.wikisearch.output.SortingRFileOutputFormat;
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -53,7 +54,6 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
@@ -140,7 +140,13 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
         return result;
     }
     if(WikipediaConfiguration.runIngest(conf))
-      return runIngestJob();
+    {
+      int result = runIngestJob();
+      if(result != 0)
+        return result;
+      if(WikipediaConfiguration.bulkIngest(conf))
+        return loadBulkFiles();
+    }
     return 0;
   }
   
@@ -195,11 +201,6 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
     
     String tablename = WikipediaConfiguration.getTableName(ingestConf);
     
-    String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
-    String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
-    
-    String user = WikipediaConfiguration.getUser(ingestConf);
-    byte[] password = WikipediaConfiguration.getPassword(ingestConf);
     Connector connector = WikipediaConfiguration.getConnector(ingestConf);
     
     TableOperations tops = connector.tableOperations();
@@ -217,13 +218,47 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
     // setup output format
     ingestJob.setMapOutputKeyClass(Text.class);
     ingestJob.setMapOutputValueClass(Mutation.class);
-    ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
-    AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename);
-    AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers);
+    
+    if(WikipediaConfiguration.bulkIngest(ingestConf))
+    {
+      ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
+      String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
+      String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
+      String user = WikipediaConfiguration.getUser(ingestConf);
+      byte[] password = WikipediaConfiguration.getPassword(ingestConf);
+      AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename);
+      AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers);
+    } else {
+      ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
+      SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
+      SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
+    }
     
     return ingestJob.waitForCompletion(true) ? 0 : 1;
   }
   
+  public int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException
+  {
+    Configuration conf = getConf();
+
+    Connector connector = WikipediaConfiguration.getConnector(conf);
+    
+    FileSystem fs = FileSystem.get(conf);
+    String directory = WikipediaConfiguration.bulkIngestDir(conf);
+    
+    String failureDirectory = WikipediaConfiguration.bulkIngestFailureDir(conf);
+    
+    for(FileStatus status: fs.listStatus(new Path(directory)))
+    {
+      if(status.isDir() == false)
+        continue;
+      Path dir = status.getPath();
+      connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failureDirectory+"/"+dir.getName(), true);
+    }
+    
+    return 0;
+  }
+  
   public final static PathFilter partFilter = new PathFilter() {
     @Override
     public boolean accept(Path path) {
@@ -241,7 +276,6 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
 
   protected void configureIngestJob(Job job) {
     job.setJarByClass(WikipediaPartitionedIngester.class);
-    job.setInputFormatClass(WikipediaInputFormat.class);
   }
   
   protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/b4f30879/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
new file mode 100644
index 0000000..a7e7dcf
--- /dev/null
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
@@ -0,0 +1,129 @@
+package org.apache.accumulo.examples.wikisearch.output;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+final class BufferingRFileRecordWriter extends RecordWriter<Text,Mutation> {
+  private final long maxSize;
+  private final AccumuloConfiguration acuconf;
+  private final Configuration conf;
+  private final String filenamePrefix;
+  private final String taskID;
+  private final FileSystem fs;
+  private int fileCount = 0;
+  private long size;
+  
+  private Map<Text,TreeMap<Key,Value>> buffers = new HashMap<Text,TreeMap<Key,Value>>();
+  private Map<Text,Long> bufferSizes = new HashMap<Text,Long>();
+
+  private TreeMap<Key,Value> getBuffer(Text tablename)
+  {
+    TreeMap<Key,Value> buffer = buffers.get(tablename);
+    if(buffer == null)
+    {
+      buffer = new TreeMap<Key,Value>();
+      buffers.put(tablename, buffer);
+      bufferSizes.put(tablename, 0l);
+    }
+    return buffer;
+  }
+  
+  private Text getLargestTablename()
+  {
+    long max = 0;
+    Text table = null;
+    for(Entry<Text,Long> e:bufferSizes.entrySet())
+    {
+      if(e.getValue() > max)
+      {
+        max = e.getValue();
+        table = e.getKey();
+      }
+    }
+    return table;
+  }
+  
+  private void flushLargestTable() throws IOException
+  {
+    Text tablename = getLargestTablename();
+    if(tablename == null)
+      return;
+    long bufferSize = bufferSizes.get(tablename);
+    TreeMap<Key,Value> buffer = buffers.get(tablename);
+    if (buffer.size() == 0)
+      return;
+    
+    // TODO fix the filename
+    String file = filenamePrefix + "/" + tablename + "/" + taskID + "_" + (fileCount++) + ".rf";
+    FileSKVWriter writer = RFileOperations.getInstance().openWriter(file, fs, conf, acuconf);
+    
+    // forget locality groups for now, just write everything to the default
+    writer.startDefaultLocalityGroup();
+    
+    for (Entry<Key,Value> e : buffer.entrySet()) {
+      writer.append(e.getKey(), e.getValue());
+    }
+    
+    writer.close();
+    
+    size -= bufferSize;
+    buffer.clear();
+    bufferSizes.put(tablename, 0l);
+  }
+  
+  BufferingRFileRecordWriter(long maxSize, AccumuloConfiguration acuconf, Configuration conf, String filenamePrefix, String taskID, FileSystem fs) {
+    this.maxSize = maxSize;
+    this.acuconf = acuconf;
+    this.conf = conf;
+    this.filenamePrefix = filenamePrefix;
+    this.taskID = taskID;
+    this.fs = fs;
+  }
+  
+  @Override
+  public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+    while(size > 0)
+      flushLargestTable();
+  }
+  
+  @Override
+  public void write(Text table, Mutation mutation) throws IOException, InterruptedException {
+    TreeMap<Key,Value> buffer = getBuffer(table);
+    int mutationSize = 0;
+    for(ColumnUpdate update: mutation.getUpdates())
+    {
+      Key k = new Key(mutation.getRow(),update.getColumnFamily(),update.getColumnQualifier(),update.getColumnVisibility(),update.getTimestamp(),update.isDeleted());
+      Value v = new Value(update.getValue());
+      mutationSize += k.getSize();
+      mutationSize += v.getSize();
+      buffer.put(k, v);
+    }
+    size += mutationSize;
+    long bufferSize = bufferSizes.get(table);
+    bufferSize += mutationSize;
+    bufferSizes.put(table, bufferSize);
+    
+    // TODO add object overhead size
+    
+    while (size >= maxSize) {
+      flushLargestTable();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/b4f30879/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
new file mode 100644
index 0000000..f556287
--- /dev/null
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
@@ -0,0 +1,103 @@
+package org.apache.accumulo.examples.wikisearch.output;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> {
+  
+  public static final String PATH_NAME = "sortingrfileoutputformat.path";
+  public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";
+  
+  public static void setPathName(Configuration conf, String path) {
+    conf.set(PATH_NAME, path);
+  }
+  
+  public static String getPathName(Configuration conf) {
+    return conf.get(PATH_NAME);
+  }
+  
+  public static void setMaxBufferSize(Configuration conf, long maxBufferSize) {
+    conf.setLong(MAX_BUFFER_SIZE, maxBufferSize);
+  }
+  
+  public static long getMaxBufferSize(Configuration conf) {
+    return conf.getLong(MAX_BUFFER_SIZE, -1);
+  }
+  
+  @Override
+  public void checkOutputSpecs(JobContext job) throws IOException, InterruptedException {
+    // TODO make sure the path is writable?
+    // TODO make sure the max buffer size is set and is reasonable
+  }
+  
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException, InterruptedException {
+    return new OutputCommitter() {
+      
+      @Override
+      public void setupTask(TaskAttemptContext arg0) throws IOException {
+        // TODO Auto-generated method stub
+        
+      }
+      
+      @Override
+      public void setupJob(JobContext arg0) throws IOException {
+        // TODO Auto-generated method stub
+        
+      }
+      
+      @Override
+      public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+      }
+      
+      @Override
+      public void commitTask(TaskAttemptContext arg0) throws IOException {
+        // TODO Auto-generated method stub
+        
+      }
+      
+      @Override
+      public void cleanupJob(JobContext arg0) throws IOException {
+        // TODO Auto-generated method stub
+        
+      }
+      
+      @Override
+      public void abortTask(TaskAttemptContext arg0) throws IOException {
+        // TODO Auto-generated method stub
+        
+      }
+    };
+  }
+  
+  @Override
+  public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws IOException, InterruptedException {
+    
+    // grab the configuration
+    final Configuration conf = attempt.getConfiguration();
+    // create a filename
+    final String filenamePrefix = getPathName(conf);
+    final String taskID = attempt.getTaskAttemptID().toString();
+    // grab the max size
+    final long maxSize = getMaxBufferSize(conf);
+    // grab the FileSystem
+    final FileSystem fs = FileSystem.get(conf);
+    // create a default AccumuloConfiguration
+    final AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
+    
+    return new BufferingRFileRecordWriter(maxSize, acuconf, conf, filenamePrefix, taskID, fs);
+  }
+  
+}