You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/04/12 19:55:39 UTC

[accumulo-examples] branch master updated: Update bulkIngest example. Fixes #17 (#20)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new a3cfb34  Update bulkIngest example. Fixes #17 (#20)
a3cfb34 is described below

commit a3cfb3472ec48e76c78806879add29e49a99627c
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Apr 12 15:55:35 2018 -0400

    Update bulkIngest example. Fixes #17 (#20)
---
 docs/bulkIngest.md                                 | 26 +++++++----
 .../examples/mapreduce/bulk/BulkIngestExample.java | 45 +++++++++---------
 .../examples/mapreduce/bulk/GenerateTestData.java  | 53 ----------------------
 .../examples/mapreduce/bulk/SetupTable.java        | 47 +++++++++++--------
 .../examples/mapreduce/bulk/VerifyIngest.java      | 22 ++-------
 .../org/apache/accumulo/examples/ExamplesIT.java   | 27 -----------
 6 files changed, 76 insertions(+), 144 deletions(-)

diff --git a/docs/bulkIngest.md b/docs/bulkIngest.md
index b856d83..5dda1d1 100644
--- a/docs/bulkIngest.md
+++ b/docs/bulkIngest.md
@@ -16,18 +16,28 @@ limitations under the License.
 -->
 # Apache Accumulo Bulk Ingest Example
 
-This is an example of how to bulk ingest data into accumulo using map reduce.
+This is an example of how to bulk ingest data into Accumulo using map reduce.
+
+This tutorial uses the following Java classes.
+
+ * [SetupTable.java] - creates the table and some data to ingest
+ * [BulkIngestExample.java] - ingest the data using map reduce
+ * [VerifyIngest.java] - checks that the data was ingested
+ 
+Remember to copy the accumulo-examples-\*.jar to Accumulo's 'lib/ext' directory.
+
+    $ cp target/accumulo-examples-*.jar /path/accumulo/lib/ext
 
 The following commands show how to run this example. This example creates a
 table called test_bulk which has two initial split points. Then 1000 rows of
 test data are created in HDFS. After that the 1000 rows are ingested into
-accumulo. Then we verify the 1000 rows are in accumulo.
+Accumulo. Then we verify the 1000 rows are in Accumulo. 
 
     $ PKG=org.apache.accumulo.examples.mapreduce.bulk
-    $ ARGS="-c examples.conf"
-    $ accumulo $PKG.SetupTable $ARGS -t test_bulk row_00000333 row_00000666
-    $ accumulo $PKG.GenerateTestData --start-row 0 --count 1000 --output bulk/test_1.txt
-    $ accumulo-util hadoop-jar target/accumulo-examples-X.Y.Z.jar $PKG.BulkIngestExample $ARGS -t test_bulk --inputDir bulk --workDir tmp/bulkWork
-    $ accumulo $PKG.VerifyIngest $ARGS -t test_bulk --start-row 0 --count 1000
+    $ accumulo $PKG.SetupTable
+    $ accumulo-util hadoop-jar target/accumulo-examples-*.jar $PKG.BulkIngestExample
+    $ ./bin/runex mapreduce.bulk.VerifyIngest
 
-For a high level discussion of bulk ingest, see the docs dir.
+[SetupTable.java]: ../src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
+[BulkIngestExample.java]:  ../src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
+[VerifyIngest.java]: ../src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
index 9754243..f87a768 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
@@ -22,11 +22,15 @@ import java.io.PrintStream;
 import java.util.Base64;
 import java.util.Collection;
 
+import org.apache.accumulo.core.client.ConnectionInfo;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.examples.cli.MapReduceClientOnRequiredTable;
 import org.apache.hadoop.conf.Configuration;
@@ -48,7 +52,11 @@ import com.beust.jcommander.Parameter;
 /**
  * Example map reduce job that bulk ingest data into an accumulo table. The expected input is text files containing tab separated key value pairs on each line.
  */
+
 public class BulkIngestExample extends Configured implements Tool {
+  static String workDir = "tmp/bulkWork";
+  static String inputDir = "bulk";
+
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
     private Text outputKey = new Text();
     private Text outputValue = new Text();
@@ -94,18 +102,8 @@ public class BulkIngestExample extends Configured implements Tool {
     }
   }
 
-  static class Opts extends MapReduceClientOnRequiredTable {
-    @Parameter(names = "--inputDir", required = true)
-    String inputDir;
-    @Parameter(names = "--workDir", required = true)
-    String workDir;
-  }
-
   @Override
   public int run(String[] args) {
-    Opts opts = new Opts();
-    opts.parseArgs(BulkIngestExample.class.getName(), args);
-
     Configuration conf = getConf();
     PrintStream out = null;
     try {
@@ -121,17 +119,22 @@ public class BulkIngestExample extends Configured implements Tool {
 
       job.setReducerClass(ReduceClass.class);
       job.setOutputFormatClass(AccumuloFileOutputFormat.class);
-      opts.setAccumuloConfigs(job);
 
-      Connector connector = opts.getConnector();
+      Connector connector = Connector.builder().usingProperties("conf/accumulo-client.properties").build();
+      ConnectionInfo connectionInfo = Connector.builder().usingProperties("conf/accumulo-client.properties").info();
+      AccumuloInputFormat.setConnectionInfo(job, connectionInfo);
+      AccumuloInputFormat.setInputTableName(job, SetupTable.tableName);
+      AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY);
+      AccumuloOutputFormat.setCreateTables(job, true);
+      AccumuloOutputFormat.setDefaultTableName(job, SetupTable.tableName);
 
-      TextInputFormat.setInputPaths(job, new Path(opts.inputDir));
-      AccumuloFileOutputFormat.setOutputPath(job, new Path(opts.workDir + "/files"));
+      TextInputFormat.setInputPaths(job, new Path(inputDir));
+      AccumuloFileOutputFormat.setOutputPath(job, new Path(workDir + "/files"));
 
       FileSystem fs = FileSystem.get(conf);
-      out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.workDir + "/splits.txt"))));
+      out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
 
-      Collection<Text> splits = connector.tableOperations().listSplits(opts.getTableName(), 100);
+      Collection<Text> splits = connector.tableOperations().listSplits(SetupTable.tableName, 100);
       for (Text split : splits)
         out.println(Base64.getEncoder().encodeToString(TextUtil.getBytes(split)));
 
@@ -139,16 +142,16 @@ public class BulkIngestExample extends Configured implements Tool {
       out.close();
 
       job.setPartitionerClass(RangePartitioner.class);
-      RangePartitioner.setSplitFile(job, opts.workDir + "/splits.txt");
+      RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
 
       job.waitForCompletion(true);
-      Path failures = new Path(opts.workDir, "failures");
+      Path failures = new Path(workDir, "failures");
       fs.delete(failures, true);
-      fs.mkdirs(new Path(opts.workDir, "failures"));
+      fs.mkdirs(new Path(workDir, "failures"));
       // With HDFS permissions on, we need to make sure the Accumulo user can read/move the rfiles
       FsShell fsShell = new FsShell(conf);
-      fsShell.run(new String[] {"-chmod", "-R", "777", opts.workDir});
-      connector.tableOperations().importDirectory(opts.getTableName(), opts.workDir + "/files", opts.workDir + "/failures", false);
+      fsShell.run(new String[] {"-chmod", "-R", "777", workDir});
+      connector.tableOperations().importDirectory(SetupTable.tableName, workDir + "/files", workDir + "/failures", false);
 
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
deleted file mode 100644
index 4622ea0..0000000
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.mapreduce.bulk;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.beust.jcommander.Parameter;
-
-public class GenerateTestData {
-
-  static class Opts extends org.apache.accumulo.core.cli.Help {
-    @Parameter(names = "--start-row", required = true)
-    int startRow = 0;
-    @Parameter(names = "--count", required = true)
-    int numRows = 0;
-    @Parameter(names = "--output", required = true)
-    String outputFile;
-  }
-
-  public static void main(String[] args) throws IOException {
-    Opts opts = new Opts();
-    opts.parseArgs(GenerateTestData.class.getName(), args);
-
-    FileSystem fs = FileSystem.get(new Configuration());
-    PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.outputFile))));
-
-    for (int i = 0; i < opts.numRows; i++) {
-      out.println(String.format("row_%010d\tvalue_%010d", i + opts.startRow, i + opts.startRow));
-    }
-    out.close();
-  }
-
-}
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
index 17e549d..225be47 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
@@ -16,35 +16,46 @@
  */
 package org.apache.accumulo.examples.mapreduce.bulk;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.io.BufferedOutputStream;
+import java.io.PrintStream;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.examples.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
-import com.beust.jcommander.Parameter;
 
 public class SetupTable {
 
-  static class Opts extends ClientOnRequiredTable {
-    @Parameter(description = "<split> { <split> ... } ")
-    List<String> splits = new ArrayList<>();
-  }
+  static String[] splits = {"row_00000333", "row_00000666"};
+  static String tableName = "test_bulk";
+  static int numRows = 1000;
+  static String outputFile = "bulk/test_1.txt";
 
   public static void main(String[] args) throws Exception {
-    Opts opts = new Opts();
-    opts.parseArgs(SetupTable.class.getName(), args);
-    Connector conn = opts.getConnector();
-    conn.tableOperations().create(opts.getTableName());
-    if (!opts.splits.isEmpty()) {
-      // create a table with initial partitions
-      TreeSet<Text> intialPartitions = new TreeSet<>();
-      for (String split : opts.splits) {
-        intialPartitions.add(new Text(split));
+    Connector conn = Connector.builder().usingProperties("conf/accumulo-client.properties").build();
+    try {
+      conn.tableOperations().create(tableName);
+    } catch (TableExistsException e) {
+      //ignore
+    }
+
+    // create a table with initial partitions
+    TreeSet<Text> intialPartitions = new TreeSet<>();
+    for (String split : splits) {
+      intialPartitions.add(new Text(split));
+    }
+    conn.tableOperations().addSplits(tableName, intialPartitions);
+
+    FileSystem fs = FileSystem.get(new Configuration());
+    try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new Path(outputFile))))) {
+      // create some data in outputFile
+      for (int i = 0; i < numRows; i++) {
+        out.println(String.format("row_%010d\tvalue_%010d", i, i));
       }
-      conn.tableOperations().addSplits(opts.getTableName(), intialPartitions);
     }
   }
 }
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
index 0dead6d..6fd1318 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
@@ -27,37 +27,25 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.examples.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.beust.jcommander.Parameter;
-
 public class VerifyIngest {
   private static final Logger log = LoggerFactory.getLogger(VerifyIngest.class);
 
-  static class Opts extends ClientOnRequiredTable {
-    @Parameter(names = "--start-row")
-    int startRow = 0;
-    @Parameter(names = "--count", required = true, description = "number of rows to verify")
-    int numRows = 0;
-  }
-
   public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-    Opts opts = new Opts();
-    opts.parseArgs(VerifyIngest.class.getName(), args);
-
-    Connector connector = opts.getConnector();
-    Scanner scanner = connector.createScanner(opts.getTableName(), opts.auths);
+    Connector connector = Connector.builder().usingProperties("conf/accumulo-client.properties").build();
+    Scanner scanner = connector.createScanner(SetupTable.tableName, Authorizations.EMPTY);
 
-    scanner.setRange(new Range(new Text(String.format("row_%010d", opts.startRow)), null));
+    scanner.setRange(new Range(String.format("row_%010d", 0), null));
 
     Iterator<Entry<Key,Value>> si = scanner.iterator();
 
     boolean ok = true;
 
-    for (int i = opts.startRow; i < opts.numRows; i++) {
+    for (int i = 0; i < SetupTable.numRows; i++) {
 
       if (si.hasNext()) {
         Entry<Key,Value> entry = si.next();
diff --git a/src/test/java/org/apache/accumulo/examples/ExamplesIT.java b/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
index 7240304..0087eb4 100644
--- a/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
+++ b/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
@@ -77,7 +77,6 @@ import org.apache.accumulo.examples.mapreduce.TableToFile;
 import org.apache.accumulo.examples.mapreduce.TeraSortIngest;
 import org.apache.accumulo.examples.mapreduce.WordCount;
 import org.apache.accumulo.examples.mapreduce.bulk.BulkIngestExample;
-import org.apache.accumulo.examples.mapreduce.bulk.GenerateTestData;
 import org.apache.accumulo.examples.mapreduce.bulk.SetupTable;
 import org.apache.accumulo.examples.mapreduce.bulk.VerifyIngest;
 import org.apache.accumulo.examples.shard.ContinuousQuery;
@@ -404,32 +403,6 @@ public class ExamplesIT extends AccumuloClusterHarness {
   }
 
   @Test
-  public void testBulkIngest() throws Exception {
-    // TODO Figure out a way to run M/R with Kerberos
-    assumeTrue(getAdminToken() instanceof PasswordToken);
-    String tableName = getUniqueNames(1)[0];
-    FileSystem fs = getFileSystem();
-    Path p = new Path(dir, "tmp");
-    if (fs.exists(p)) {
-      fs.delete(p, true);
-    }
-    goodExec(GenerateTestData.class, "--start-row", "0", "--count", "10000", "--output", dir + "/tmp/input/data");
-
-    List<String> commonArgs = new ArrayList<>(Arrays.asList(new String[] {"-c", getConnectionFile(), "--table", tableName}));
-
-    List<String> args = new ArrayList<>(commonArgs);
-    goodExec(SetupTable.class, args.toArray(new String[0]));
-
-    args = new ArrayList<>(commonArgs);
-    args.addAll(Arrays.asList(new String[] {"--inputDir", dir + "/tmp/input", "--workDir", dir + "/tmp"}));
-    goodExec(BulkIngestExample.class, args.toArray(new String[0]));
-
-    args = new ArrayList<>(commonArgs);
-    args.addAll(Arrays.asList(new String[] {"--start-row", "0", "--count", "10000"}));
-    goodExec(VerifyIngest.class, args.toArray(new String[0]));
-  }
-
-  @Test
   public void testTeraSortAndRead() throws Exception {
     // TODO Figure out a way to run M/R with Kerberos
     assumeTrue(getAdminToken() instanceof PasswordToken);

-- 
To stop receiving notification emails like this one, please contact
mmiller@apache.org.