You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by uj...@apache.org on 2014/03/19 17:08:17 UTC
[07/50] [abbrv] git commit: ACCUMULO-381 added a bulk ingest option
for wikisearch ingest
ACCUMULO-381 added a bulk ingest option for wikisearch ingest
git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1243506 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/b4f30879
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/b4f30879
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/b4f30879
Branch: refs/heads/master
Commit: b4f30879d903e29a240680f33db7b716e9ac7145
Parents: 72b7221
Author: Adam Fuchs <af...@apache.org>
Authored: Mon Feb 13 13:49:12 2012 +0000
Committer: Adam Fuchs <af...@apache.org>
Committed: Mon Feb 13 13:49:12 2012 +0000
----------------------------------------------------------------------
ingest/conf/wikipedia_parallel.xml.example | 16 +++
.../ingest/WikipediaConfiguration.java | 22 +++-
.../ingest/WikipediaPartitionedIngester.java | 56 ++++++--
.../output/BufferingRFileRecordWriter.java | 129 +++++++++++++++++++
.../output/SortingRFileOutputFormat.java | 103 +++++++++++++++
5 files changed, 314 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/b4f30879/ingest/conf/wikipedia_parallel.xml.example
----------------------------------------------------------------------
diff --git a/ingest/conf/wikipedia_parallel.xml.example b/ingest/conf/wikipedia_parallel.xml.example
index cf20f01..53220f0 100644
--- a/ingest/conf/wikipedia_parallel.xml.example
+++ b/ingest/conf/wikipedia_parallel.xml.example
@@ -56,4 +56,20 @@
<name>wikipedia.run.ingest</name>
<value><!--whether to run the ingest step --></value>
</property>
+ <property>
+ <name>wikipedia.bulk.ingest</name>
+ <value><!--whether to use bulk ingest vice streaming ingest --></value>
+ </property>
+ <property>
+ <name>wikipedia.bulk.ingest.dir</name>
+ <value><!--the directory to store rfiles for bulk ingest --></value>
+ </property>
+ <property>
+ <name>wikipedia.bulk.ingest.failure.dir</name>
+ <value><!--the directory to store failed rfiles after bulk ingest --></value>
+ </property>
+ <property>
+ <name>wikipedia.bulk.ingest.buffer.size</name>
+ <value><!--the ammount of memory to use for buffering and sorting key/value pairs in each mapper before writing rfiles --></value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/b4f30879/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
index 5a0aad4..a84d90c 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
@@ -52,6 +52,10 @@ public class WikipediaConfiguration {
public final static String RUN_PARTITIONER = "wikipedia.run.partitioner";
public final static String RUN_INGEST = "wikipedia.run.ingest";
+ public final static String BULK_INGEST = "wikipedia.bulk.ingest";
+ public final static String BULK_INGEST_DIR = "wikipedia.bulk.ingest.dir";
+ public final static String BULK_INGEST_FAILURE_DIR = "wikipedia.bulk.ingest.failure.dir";
+ public final static String BULK_INGEST_BUFFER_SIZE = "wikipedia.bulk.ingest.buffer.size";
public static String getUser(Configuration conf) {
@@ -134,6 +138,22 @@ public class WikipediaConfiguration {
return conf.getBoolean(RUN_INGEST, true);
}
+ public static boolean bulkIngest(Configuration conf) {
+ return conf.getBoolean(BULK_INGEST, true);
+ }
+
+ public static String bulkIngestDir(Configuration conf) {
+ return conf.get(BULK_INGEST_DIR);
+ }
+
+ public static String bulkIngestFailureDir(Configuration conf) {
+ return conf.get(BULK_INGEST_FAILURE_DIR);
+ }
+
+ public static long bulkIngestBufferSize(Configuration conf) {
+ return conf.getLong(BULK_INGEST_BUFFER_SIZE,1l<<28);
+ }
+
/**
* Helper method to get properties from Hadoop configuration
*
@@ -169,5 +189,5 @@ public class WikipediaConfiguration {
throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/b4f30879/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
index 5571290..ca9af6a 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
@@ -42,6 +42,7 @@ import org.apache.accumulo.core.iterators.user.SummingCombiner;
import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;
+import org.apache.accumulo.examples.wikisearch.output.SortingRFileOutputFormat;
import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -53,7 +54,6 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
@@ -140,7 +140,13 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
return result;
}
if(WikipediaConfiguration.runIngest(conf))
- return runIngestJob();
+ {
+ int result = runIngestJob();
+ if(result != 0)
+ return result;
+ if(WikipediaConfiguration.bulkIngest(conf))
+ return loadBulkFiles();
+ }
return 0;
}
@@ -195,11 +201,6 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
String tablename = WikipediaConfiguration.getTableName(ingestConf);
- String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
- String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
-
- String user = WikipediaConfiguration.getUser(ingestConf);
- byte[] password = WikipediaConfiguration.getPassword(ingestConf);
Connector connector = WikipediaConfiguration.getConnector(ingestConf);
TableOperations tops = connector.tableOperations();
@@ -217,13 +218,47 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
// setup output format
ingestJob.setMapOutputKeyClass(Text.class);
ingestJob.setMapOutputValueClass(Mutation.class);
- ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
- AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename);
- AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers);
+
+ if(WikipediaConfiguration.bulkIngest(ingestConf))
+ {
+ ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
+ String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
+ String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
+ String user = WikipediaConfiguration.getUser(ingestConf);
+ byte[] password = WikipediaConfiguration.getPassword(ingestConf);
+ AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename);
+ AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers);
+ } else {
+ ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
+ SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
+ SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
+ }
return ingestJob.waitForCompletion(true) ? 0 : 1;
}
+ public int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException
+ {
+ Configuration conf = getConf();
+
+ Connector connector = WikipediaConfiguration.getConnector(conf);
+
+ FileSystem fs = FileSystem.get(conf);
+ String directory = WikipediaConfiguration.bulkIngestDir(conf);
+
+ String failureDirectory = WikipediaConfiguration.bulkIngestFailureDir(conf);
+
+ for(FileStatus status: fs.listStatus(new Path(directory)))
+ {
+ if(status.isDir() == false)
+ continue;
+ Path dir = status.getPath();
+ connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failureDirectory+"/"+dir.getName(), true);
+ }
+
+ return 0;
+ }
+
public final static PathFilter partFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
@@ -241,7 +276,6 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
protected void configureIngestJob(Job job) {
job.setJarByClass(WikipediaPartitionedIngester.class);
- job.setInputFormatClass(WikipediaInputFormat.class);
}
protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/b4f30879/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
new file mode 100644
index 0000000..a7e7dcf
--- /dev/null
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java
@@ -0,0 +1,129 @@
+package org.apache.accumulo.examples.wikisearch.output;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+final class BufferingRFileRecordWriter extends RecordWriter<Text,Mutation> {
+ private final long maxSize;
+ private final AccumuloConfiguration acuconf;
+ private final Configuration conf;
+ private final String filenamePrefix;
+ private final String taskID;
+ private final FileSystem fs;
+ private int fileCount = 0;
+ private long size;
+
+ private Map<Text,TreeMap<Key,Value>> buffers = new HashMap<Text,TreeMap<Key,Value>>();
+ private Map<Text,Long> bufferSizes = new HashMap<Text,Long>();
+
+ private TreeMap<Key,Value> getBuffer(Text tablename)
+ {
+ TreeMap<Key,Value> buffer = buffers.get(tablename);
+ if(buffer == null)
+ {
+ buffer = new TreeMap<Key,Value>();
+ buffers.put(tablename, buffer);
+ bufferSizes.put(tablename, 0l);
+ }
+ return buffer;
+ }
+
+ private Text getLargestTablename()
+ {
+ long max = 0;
+ Text table = null;
+ for(Entry<Text,Long> e:bufferSizes.entrySet())
+ {
+ if(e.getValue() > max)
+ {
+ max = e.getValue();
+ table = e.getKey();
+ }
+ }
+ return table;
+ }
+
+ private void flushLargestTable() throws IOException
+ {
+ Text tablename = getLargestTablename();
+ if(tablename == null)
+ return;
+ long bufferSize = bufferSizes.get(tablename);
+ TreeMap<Key,Value> buffer = buffers.get(tablename);
+ if (buffer.size() == 0)
+ return;
+
+ // TODO fix the filename
+ String file = filenamePrefix + "/" + tablename + "/" + taskID + "_" + (fileCount++) + ".rf";
+ FileSKVWriter writer = RFileOperations.getInstance().openWriter(file, fs, conf, acuconf);
+
+ // forget locality groups for now, just write everything to the default
+ writer.startDefaultLocalityGroup();
+
+ for (Entry<Key,Value> e : buffer.entrySet()) {
+ writer.append(e.getKey(), e.getValue());
+ }
+
+ writer.close();
+
+ size -= bufferSize;
+ buffer.clear();
+ bufferSizes.put(tablename, 0l);
+ }
+
+ BufferingRFileRecordWriter(long maxSize, AccumuloConfiguration acuconf, Configuration conf, String filenamePrefix, String taskID, FileSystem fs) {
+ this.maxSize = maxSize;
+ this.acuconf = acuconf;
+ this.conf = conf;
+ this.filenamePrefix = filenamePrefix;
+ this.taskID = taskID;
+ this.fs = fs;
+ }
+
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+ while(size > 0)
+ flushLargestTable();
+ }
+
+ @Override
+ public void write(Text table, Mutation mutation) throws IOException, InterruptedException {
+ TreeMap<Key,Value> buffer = getBuffer(table);
+ int mutationSize = 0;
+ for(ColumnUpdate update: mutation.getUpdates())
+ {
+ Key k = new Key(mutation.getRow(),update.getColumnFamily(),update.getColumnQualifier(),update.getColumnVisibility(),update.getTimestamp(),update.isDeleted());
+ Value v = new Value(update.getValue());
+ mutationSize += k.getSize();
+ mutationSize += v.getSize();
+ buffer.put(k, v);
+ }
+ size += mutationSize;
+ long bufferSize = bufferSizes.get(table);
+ bufferSize += mutationSize;
+ bufferSizes.put(table, bufferSize);
+
+ // TODO add object overhead size
+
+ while (size >= maxSize) {
+ flushLargestTable();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/b4f30879/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
new file mode 100644
index 0000000..f556287
--- /dev/null
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
@@ -0,0 +1,103 @@
+package org.apache.accumulo.examples.wikisearch.output;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> {
+
+ public static final String PATH_NAME = "sortingrfileoutputformat.path";
+ public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";
+
+ public static void setPathName(Configuration conf, String path) {
+ conf.set(PATH_NAME, path);
+ }
+
+ public static String getPathName(Configuration conf) {
+ return conf.get(PATH_NAME);
+ }
+
+ public static void setMaxBufferSize(Configuration conf, long maxBufferSize) {
+ conf.setLong(MAX_BUFFER_SIZE, maxBufferSize);
+ }
+
+ public static long getMaxBufferSize(Configuration conf) {
+ return conf.getLong(MAX_BUFFER_SIZE, -1);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext job) throws IOException, InterruptedException {
+ // TODO make sure the path is writable?
+ // TODO make sure the max buffer size is set and is reasonable
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException, InterruptedException {
+ return new OutputCommitter() {
+
+ @Override
+ public void setupTask(TaskAttemptContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setupJob(JobContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void cleanupJob(JobContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext arg0) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+ };
+ }
+
+ @Override
+ public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws IOException, InterruptedException {
+
+ // grab the configuration
+ final Configuration conf = attempt.getConfiguration();
+ // create a filename
+ final String filenamePrefix = getPathName(conf);
+ final String taskID = attempt.getTaskAttemptID().toString();
+ // grab the max size
+ final long maxSize = getMaxBufferSize(conf);
+ // grab the FileSystem
+ final FileSystem fs = FileSystem.get(conf);
+ // create a default AccumuloConfiguration
+ final AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
+
+ return new BufferingRFileRecordWriter(maxSize, acuconf, conf, filenamePrefix, taskID, fs);
+ }
+
+}