You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/04/12 19:55:39 UTC
[accumulo-examples] branch master updated: Update bulkIngest
example. Fixes #17 (#20)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-examples.git
The following commit(s) were added to refs/heads/master by this push:
new a3cfb34 Update bulkIngest example. Fixes #17 (#20)
a3cfb34 is described below
commit a3cfb3472ec48e76c78806879add29e49a99627c
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Apr 12 15:55:35 2018 -0400
Update bulkIngest example. Fixes #17 (#20)
---
docs/bulkIngest.md | 26 +++++++----
.../examples/mapreduce/bulk/BulkIngestExample.java | 45 +++++++++---------
.../examples/mapreduce/bulk/GenerateTestData.java | 53 ----------------------
.../examples/mapreduce/bulk/SetupTable.java | 47 +++++++++++--------
.../examples/mapreduce/bulk/VerifyIngest.java | 22 ++-------
.../org/apache/accumulo/examples/ExamplesIT.java | 27 -----------
6 files changed, 76 insertions(+), 144 deletions(-)
diff --git a/docs/bulkIngest.md b/docs/bulkIngest.md
index b856d83..5dda1d1 100644
--- a/docs/bulkIngest.md
+++ b/docs/bulkIngest.md
@@ -16,18 +16,28 @@ limitations under the License.
-->
# Apache Accumulo Bulk Ingest Example
-This is an example of how to bulk ingest data into accumulo using map reduce.
+This is an example of how to bulk ingest data into Accumulo using map reduce.
+
+This tutorial uses the following Java classes.
+
+ * [SetupTable.java] - creates the table and some data to ingest
+ * [BulkIngestExample.java] - ingest the data using map reduce
+ * [VerifyIngest.java] - checks that the data was ingested
+
+Remember to copy the accumulo-examples-\*.jar to Accumulo's 'lib/ext' directory.
+
+ $ cp target/accumulo-examples-*.jar /path/accumulo/lib/ext
The following commands show how to run this example. This example creates a
table called test_bulk which has two initial split points. Then 1000 rows of
test data are created in HDFS. After that the 1000 rows are ingested into
-accumulo. Then we verify the 1000 rows are in accumulo.
+Accumulo. Then we verify the 1000 rows are in Accumulo.
$ PKG=org.apache.accumulo.examples.mapreduce.bulk
- $ ARGS="-c examples.conf"
- $ accumulo $PKG.SetupTable $ARGS -t test_bulk row_00000333 row_00000666
- $ accumulo $PKG.GenerateTestData --start-row 0 --count 1000 --output bulk/test_1.txt
- $ accumulo-util hadoop-jar target/accumulo-examples-X.Y.Z.jar $PKG.BulkIngestExample $ARGS -t test_bulk --inputDir bulk --workDir tmp/bulkWork
- $ accumulo $PKG.VerifyIngest $ARGS -t test_bulk --start-row 0 --count 1000
+ $ accumulo $PKG.SetupTable
+ $ accumulo-util hadoop-jar target/accumulo-examples-*.jar $PKG.BulkIngestExample
+ $ ./bin/runex mapreduce.bulk.VerifyIngest
-For a high level discussion of bulk ingest, see the docs dir.
+[SetupTable.java]: ../src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
+[BulkIngestExample.java]: ../src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
+[VerifyIngest.java]: ../src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
index 9754243..f87a768 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
@@ -22,11 +22,15 @@ import java.io.PrintStream;
import java.util.Base64;
import java.util.Collection;
+import org.apache.accumulo.core.client.ConnectionInfo;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.examples.cli.MapReduceClientOnRequiredTable;
import org.apache.hadoop.conf.Configuration;
@@ -48,7 +52,11 @@ import com.beust.jcommander.Parameter;
/**
* Example map reduce job that bulk ingest data into an accumulo table. The expected input is text files containing tab separated key value pairs on each line.
*/
+
public class BulkIngestExample extends Configured implements Tool {
+ static String workDir = "tmp/bulkWork";
+ static String inputDir = "bulk";
+
public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
@@ -94,18 +102,8 @@ public class BulkIngestExample extends Configured implements Tool {
}
}
- static class Opts extends MapReduceClientOnRequiredTable {
- @Parameter(names = "--inputDir", required = true)
- String inputDir;
- @Parameter(names = "--workDir", required = true)
- String workDir;
- }
-
@Override
public int run(String[] args) {
- Opts opts = new Opts();
- opts.parseArgs(BulkIngestExample.class.getName(), args);
-
Configuration conf = getConf();
PrintStream out = null;
try {
@@ -121,17 +119,22 @@ public class BulkIngestExample extends Configured implements Tool {
job.setReducerClass(ReduceClass.class);
job.setOutputFormatClass(AccumuloFileOutputFormat.class);
- opts.setAccumuloConfigs(job);
- Connector connector = opts.getConnector();
+ Connector connector = Connector.builder().usingProperties("conf/accumulo-client.properties").build();
+ ConnectionInfo connectionInfo = Connector.builder().usingProperties("conf/accumulo-client.properties").info();
+ AccumuloInputFormat.setConnectionInfo(job, connectionInfo);
+ AccumuloInputFormat.setInputTableName(job, SetupTable.tableName);
+ AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY);
+ AccumuloOutputFormat.setCreateTables(job, true);
+ AccumuloOutputFormat.setDefaultTableName(job, SetupTable.tableName);
- TextInputFormat.setInputPaths(job, new Path(opts.inputDir));
- AccumuloFileOutputFormat.setOutputPath(job, new Path(opts.workDir + "/files"));
+ TextInputFormat.setInputPaths(job, new Path(inputDir));
+ AccumuloFileOutputFormat.setOutputPath(job, new Path(workDir + "/files"));
FileSystem fs = FileSystem.get(conf);
- out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.workDir + "/splits.txt"))));
+ out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
- Collection<Text> splits = connector.tableOperations().listSplits(opts.getTableName(), 100);
+ Collection<Text> splits = connector.tableOperations().listSplits(SetupTable.tableName, 100);
for (Text split : splits)
out.println(Base64.getEncoder().encodeToString(TextUtil.getBytes(split)));
@@ -139,16 +142,16 @@ public class BulkIngestExample extends Configured implements Tool {
out.close();
job.setPartitionerClass(RangePartitioner.class);
- RangePartitioner.setSplitFile(job, opts.workDir + "/splits.txt");
+ RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
job.waitForCompletion(true);
- Path failures = new Path(opts.workDir, "failures");
+ Path failures = new Path(workDir, "failures");
fs.delete(failures, true);
- fs.mkdirs(new Path(opts.workDir, "failures"));
+ fs.mkdirs(new Path(workDir, "failures"));
// With HDFS permissions on, we need to make sure the Accumulo user can read/move the rfiles
FsShell fsShell = new FsShell(conf);
- fsShell.run(new String[] {"-chmod", "-R", "777", opts.workDir});
- connector.tableOperations().importDirectory(opts.getTableName(), opts.workDir + "/files", opts.workDir + "/failures", false);
+ fsShell.run(new String[] {"-chmod", "-R", "777", workDir});
+ connector.tableOperations().importDirectory(SetupTable.tableName, workDir + "/files", workDir + "/failures", false);
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
deleted file mode 100644
index 4622ea0..0000000
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.mapreduce.bulk;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.beust.jcommander.Parameter;
-
-public class GenerateTestData {
-
- static class Opts extends org.apache.accumulo.core.cli.Help {
- @Parameter(names = "--start-row", required = true)
- int startRow = 0;
- @Parameter(names = "--count", required = true)
- int numRows = 0;
- @Parameter(names = "--output", required = true)
- String outputFile;
- }
-
- public static void main(String[] args) throws IOException {
- Opts opts = new Opts();
- opts.parseArgs(GenerateTestData.class.getName(), args);
-
- FileSystem fs = FileSystem.get(new Configuration());
- PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.outputFile))));
-
- for (int i = 0; i < opts.numRows; i++) {
- out.println(String.format("row_%010d\tvalue_%010d", i + opts.startRow, i + opts.startRow));
- }
- out.close();
- }
-
-}
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
index 17e549d..225be47 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
@@ -16,35 +16,46 @@
*/
package org.apache.accumulo.examples.mapreduce.bulk;
-import java.util.ArrayList;
-import java.util.List;
+import java.io.BufferedOutputStream;
+import java.io.PrintStream;
import java.util.TreeSet;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.examples.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import com.beust.jcommander.Parameter;
public class SetupTable {
- static class Opts extends ClientOnRequiredTable {
- @Parameter(description = "<split> { <split> ... } ")
- List<String> splits = new ArrayList<>();
- }
+ static String[] splits = {"row_00000333", "row_00000666"};
+ static String tableName = "test_bulk";
+ static int numRows = 1000;
+ static String outputFile = "bulk/test_1.txt";
public static void main(String[] args) throws Exception {
- Opts opts = new Opts();
- opts.parseArgs(SetupTable.class.getName(), args);
- Connector conn = opts.getConnector();
- conn.tableOperations().create(opts.getTableName());
- if (!opts.splits.isEmpty()) {
- // create a table with initial partitions
- TreeSet<Text> intialPartitions = new TreeSet<>();
- for (String split : opts.splits) {
- intialPartitions.add(new Text(split));
+ Connector conn = Connector.builder().usingProperties("conf/accumulo-client.properties").build();
+ try {
+ conn.tableOperations().create(tableName);
+ } catch (TableExistsException e) {
+ //ignore
+ }
+
+ // create a table with initial partitions
+ TreeSet<Text> intialPartitions = new TreeSet<>();
+ for (String split : splits) {
+ intialPartitions.add(new Text(split));
+ }
+ conn.tableOperations().addSplits(tableName, intialPartitions);
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new Path(outputFile))))) {
+ // create some data in outputFile
+ for (int i = 0; i < numRows; i++) {
+ out.println(String.format("row_%010d\tvalue_%010d", i, i));
}
- conn.tableOperations().addSplits(opts.getTableName(), intialPartitions);
}
}
}
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
index 0dead6d..6fd1318 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
@@ -27,37 +27,25 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.examples.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.beust.jcommander.Parameter;
-
public class VerifyIngest {
private static final Logger log = LoggerFactory.getLogger(VerifyIngest.class);
- static class Opts extends ClientOnRequiredTable {
- @Parameter(names = "--start-row")
- int startRow = 0;
- @Parameter(names = "--count", required = true, description = "number of rows to verify")
- int numRows = 0;
- }
-
public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
- Opts opts = new Opts();
- opts.parseArgs(VerifyIngest.class.getName(), args);
-
- Connector connector = opts.getConnector();
- Scanner scanner = connector.createScanner(opts.getTableName(), opts.auths);
+ Connector connector = Connector.builder().usingProperties("conf/accumulo-client.properties").build();
+ Scanner scanner = connector.createScanner(SetupTable.tableName, Authorizations.EMPTY);
- scanner.setRange(new Range(new Text(String.format("row_%010d", opts.startRow)), null));
+ scanner.setRange(new Range(String.format("row_%010d", 0), null));
Iterator<Entry<Key,Value>> si = scanner.iterator();
boolean ok = true;
- for (int i = opts.startRow; i < opts.numRows; i++) {
+ for (int i = 0; i < SetupTable.numRows; i++) {
if (si.hasNext()) {
Entry<Key,Value> entry = si.next();
diff --git a/src/test/java/org/apache/accumulo/examples/ExamplesIT.java b/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
index 7240304..0087eb4 100644
--- a/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
+++ b/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
@@ -77,7 +77,6 @@ import org.apache.accumulo.examples.mapreduce.TableToFile;
import org.apache.accumulo.examples.mapreduce.TeraSortIngest;
import org.apache.accumulo.examples.mapreduce.WordCount;
import org.apache.accumulo.examples.mapreduce.bulk.BulkIngestExample;
-import org.apache.accumulo.examples.mapreduce.bulk.GenerateTestData;
import org.apache.accumulo.examples.mapreduce.bulk.SetupTable;
import org.apache.accumulo.examples.mapreduce.bulk.VerifyIngest;
import org.apache.accumulo.examples.shard.ContinuousQuery;
@@ -404,32 +403,6 @@ public class ExamplesIT extends AccumuloClusterHarness {
}
@Test
- public void testBulkIngest() throws Exception {
- // TODO Figure out a way to run M/R with Kerberos
- assumeTrue(getAdminToken() instanceof PasswordToken);
- String tableName = getUniqueNames(1)[0];
- FileSystem fs = getFileSystem();
- Path p = new Path(dir, "tmp");
- if (fs.exists(p)) {
- fs.delete(p, true);
- }
- goodExec(GenerateTestData.class, "--start-row", "0", "--count", "10000", "--output", dir + "/tmp/input/data");
-
- List<String> commonArgs = new ArrayList<>(Arrays.asList(new String[] {"-c", getConnectionFile(), "--table", tableName}));
-
- List<String> args = new ArrayList<>(commonArgs);
- goodExec(SetupTable.class, args.toArray(new String[0]));
-
- args = new ArrayList<>(commonArgs);
- args.addAll(Arrays.asList(new String[] {"--inputDir", dir + "/tmp/input", "--workDir", dir + "/tmp"}));
- goodExec(BulkIngestExample.class, args.toArray(new String[0]));
-
- args = new ArrayList<>(commonArgs);
- args.addAll(Arrays.asList(new String[] {"--start-row", "0", "--count", "10000"}));
- goodExec(VerifyIngest.class, args.toArray(new String[0]));
- }
-
- @Test
public void testTeraSortAndRead() throws Exception {
// TODO Figure out a way to run M/R with Kerberos
assumeTrue(getAdminToken() instanceof PasswordToken);
--
To stop receiving notification emails like this one, please contact
mmiller@apache.org.