You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by uj...@apache.org on 2014/03/19 17:08:18 UTC
[08/50] [abbrv] git commit: ACCUMULO-375
ACCUMULO-375
git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1243961 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/0e1e67db
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/0e1e67db
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/0e1e67db
Branch: refs/heads/master
Commit: 0e1e67dba93a200297da959823526d2264a85eab
Parents: b4f3087
Author: Adam Fuchs <af...@apache.org>
Authored: Tue Feb 14 14:46:37 2012 +0000
Committer: Adam Fuchs <af...@apache.org>
Committed: Tue Feb 14 14:46:37 2012 +0000
----------------------------------------------------------------------
.../ingest/WikipediaPartitionedIngester.java | 30 ++++++++++++++------
.../output/SortingRFileOutputFormat.java | 6 +++-
2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/0e1e67db/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
index ca9af6a..bcdee43 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args;
import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;
@@ -58,9 +59,12 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
public class WikipediaPartitionedIngester extends Configured implements Tool {
-
+
+ private static final Logger log = Logger.getLogger(WikipediaPartitionedIngester.class);
+
public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
public final static String SPLIT_FILE = "wikipedia.split_file";
public final static String TABLE_NAME = "wikipedia.table";
@@ -150,7 +154,7 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
return 0;
}
- public int runPartitionerJob() throws Exception
+ private int runPartitionerJob() throws Exception
{
Job partitionerJob = new Job(getConf(), "Partition Wikipedia");
Configuration partitionerConf = partitionerJob.getConfiguration();
@@ -191,7 +195,7 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
return partitionerJob.waitForCompletion(true) ? 0 : 1;
}
- public int runIngestJob() throws Exception
+ private int runIngestJob() throws Exception
{
Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia");
Configuration ingestConf = ingestJob.getConfiguration();
@@ -221,6 +225,16 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
if(WikipediaConfiguration.bulkIngest(ingestConf))
{
+ ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
+ SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
+ String bulkIngestDir = WikipediaConfiguration.bulkIngestDir(ingestConf);
+ if(bulkIngestDir == null)
+ {
+ log.error("Bulk ingest dir not set");
+ return 1;
+ }
+ SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
+ } else {
ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
@@ -228,16 +242,12 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
byte[] password = WikipediaConfiguration.getPassword(ingestConf);
AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename);
AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers);
- } else {
- ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
- SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
- SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
}
return ingestJob.waitForCompletion(true) ? 0 : 1;
}
- public int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException
+ private int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException
{
Configuration conf = getConf();
@@ -253,7 +263,9 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
if(status.isDir() == false)
continue;
Path dir = status.getPath();
- connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failureDirectory+"/"+dir.getName(), true);
+ Path failPath = new Path(failureDirectory+"/"+dir.getName());
+ fs.mkdirs(failPath);
+ connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failPath.toString(), true);
}
return 0;
http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/0e1e67db/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
index f556287..d8c57c2 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
@@ -4,6 +4,7 @@ import java.io.IOException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -12,9 +13,12 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> {
-
+
+ private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class);
+
public static final String PATH_NAME = "sortingrfileoutputformat.path";
public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";