You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by uj...@apache.org on 2014/03/19 17:08:18 UTC

[08/50] [abbrv] git commit: ACCUMULO-375

ACCUMULO-375

git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1243961 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/0e1e67db
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/0e1e67db
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/0e1e67db

Branch: refs/heads/master
Commit: 0e1e67dba93a200297da959823526d2264a85eab
Parents: b4f3087
Author: Adam Fuchs <af...@apache.org>
Authored: Tue Feb 14 14:46:37 2012 +0000
Committer: Adam Fuchs <af...@apache.org>
Committed: Tue Feb 14 14:46:37 2012 +0000

----------------------------------------------------------------------
 .../ingest/WikipediaPartitionedIngester.java    | 30 ++++++++++++++------
 .../output/SortingRFileOutputFormat.java        |  6 +++-
 2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/0e1e67db/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
index ca9af6a..bcdee43 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
 import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
 import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;
@@ -58,9 +59,12 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
 
 public class WikipediaPartitionedIngester extends Configured implements Tool {
-  
+
+  private static final Logger log = Logger.getLogger(WikipediaPartitionedIngester.class);
+
   public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
   public final static String SPLIT_FILE = "wikipedia.split_file";
   public final static String TABLE_NAME = "wikipedia.table";
@@ -150,7 +154,7 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
     return 0;
   }
   
-  public int runPartitionerJob() throws Exception
+  private int runPartitionerJob() throws Exception
   {
     Job partitionerJob = new Job(getConf(), "Partition Wikipedia");
     Configuration partitionerConf = partitionerJob.getConfiguration();
@@ -191,7 +195,7 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
     return partitionerJob.waitForCompletion(true) ? 0 : 1;
   }
   
-  public int runIngestJob() throws Exception
+  private int runIngestJob() throws Exception
   {
     Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia");
     Configuration ingestConf = ingestJob.getConfiguration();
@@ -221,6 +225,16 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
     
     if(WikipediaConfiguration.bulkIngest(ingestConf))
     {
+      ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
+      SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
+      String bulkIngestDir = WikipediaConfiguration.bulkIngestDir(ingestConf);
+      if(bulkIngestDir == null)
+      {
+        log.error("Bulk ingest dir not set");
+        return 1;
+      }
+      SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
+    } else {
       ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
       String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
       String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
@@ -228,16 +242,12 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
       byte[] password = WikipediaConfiguration.getPassword(ingestConf);
       AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename);
       AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers);
-    } else {
-      ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
-      SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
-      SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
     }
     
     return ingestJob.waitForCompletion(true) ? 0 : 1;
   }
   
-  public int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException
+  private int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException
   {
     Configuration conf = getConf();
 
@@ -253,7 +263,9 @@ public class WikipediaPartitionedIngester extends Configured implements Tool {
       if(status.isDir() == false)
         continue;
       Path dir = status.getPath();
-      connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failureDirectory+"/"+dir.getName(), true);
+      Path failPath = new Path(failureDirectory+"/"+dir.getName());
+      fs.mkdirs(failPath);
+      connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failPath.toString(), true);
     }
     
     return 0;

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/0e1e67db/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
index f556287..d8c57c2 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
@@ -4,6 +4,7 @@ import java.io.IOException;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -12,9 +13,12 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> {
-  
+
+  private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class);
+
   public static final String PATH_NAME = "sortingrfileoutputformat.path";
   public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";