You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2019/04/24 21:49:43 UTC
[accumulo-website] branch master updated: Created docs for using
Apache Spark with Accumulo (#171)
This is an automated email from the ASF dual-hosted git repository.
mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-website.git
The following commit(s) were added to refs/heads/master by this push:
new 858470e Created docs for using Apache Spark with Accumulo (#171)
858470e is described below
commit 858470eece88756783aa5659cafafe5e0e92ba97
Author: Mike Walch <mw...@apache.org>
AuthorDate: Wed Apr 24 17:49:39 2019 -0400
Created docs for using Apache Spark with Accumulo (#171)
---
_docs-2/development/proxy.md | 2 +-
_docs-2/development/spark.md | 109 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 110 insertions(+), 1 deletion(-)
diff --git a/_docs-2/development/proxy.md b/_docs-2/development/proxy.md
index 1a5f598..9bd5a1c 100644
--- a/_docs-2/development/proxy.md
+++ b/_docs-2/development/proxy.md
@@ -1,7 +1,7 @@
---
title: Proxy
category: development
-order: 3
+order: 7
---
The proxy API allows the interaction with Accumulo with languages other than Java.
diff --git a/_docs-2/development/spark.md b/_docs-2/development/spark.md
new file mode 100644
index 0000000..e1bb251
--- /dev/null
+++ b/_docs-2/development/spark.md
@@ -0,0 +1,109 @@
+---
+title: Spark
+category: development
+order: 3
+---
+
+[Apache Spark] applications can read and write from Accumulo tables.
+
+Before reading this documentation, it may help to review the [MapReduce]
+documentation as API created for MapReduce jobs is used by Spark.
+
+This documentation references code from the Accumulo [Spark example].
+
+## General configuration
+
+1. Create a [shaded jar] with your Spark code and all of your dependencies (excluding
+ Spark and Hadoop). When creating the shaded jar, you should relocate Guava
+ as Accumulo uses a different version. The [pom.xml] in the [Spark example] is
+ a good reference and can be used a a starting point for a Spark application.
+
+2. Submit the job by running `spark-submit` with your shaded jar. You should pass
+ in the location of your `accumulo-client.properties` that will be used to connect
+ to your Accumulo instance.
+ ```bash
+ $SPARK_HOME/bin/spark-submit \
+ --class com.my.spark.job.MainClass \
+ --master yarn \
+ --deploy-mode client \
+ /path/to/spark-job-shaded.jar \
+ /path/to/accumulo-client.properties
+ ```
+
+## Reading from Accumulo table
+
+Apache Spark can read from an Accumulo table by using [AccumuloInputFormat].
+
+```java
+Job job = Job.getInstance();
+AccumuloInputFormat.configure().clientProperties(props).table(inputTable).store(job);
+JavaPairRDD<Key,Value> data = sc.newAPIHadoopRDD(job.getConfiguration(),
+ AccumuloInputFormat.class, Key.class, Value.class);
+```
+
+## Writing to Accumulo table
+
+There are two ways to write an Accumulo table.
+
+### Use a BatchWriter
+
+Write your data to Accumulo by creating an AccumuloClient for each partition and writing all
+data in the partition using a BatchWriter.
+
+```java
+// Spark will automatically serialize this properties object and send it to each partition
+Properties props = Accumulo.newClientProperties()
+ .from("/path/to/accumulo-client.properties").build();
+JavaPairRDD<Key, Value> dataToWrite = ... ;
+dataToWrite.foreachPartition(iter -> {
+ // Create client inside partition so that Spark does not attempt to serialize it.
+ try (AccumuloClient client = Accumulo.newClient().from(props).build();
+ BatchWriter bw = client.createBatchWriter(outputTable)) {
+ iter.forEachRemaining(kv -> {
+ Key key = kv._1;
+ Value val = kv._2;
+ Mutation m = new Mutation(key.getRow());
+ m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier())
+ .visibility(key.getColumnVisibility()).timestamp(key.getTimestamp()).put(val);
+ bw.addMutation(m);
+ });
+ }
+});
+```
+
+### Using Bulk Import
+
+Partition your data and write it to RFiles. The [AccumuloRangePartitioner] found in the Accumulo
+Spark example can be used for partitioning data. After your data has been written to an output
+directory using [AccumuloFileOutputFormat] as RFiles, bulk import this directory into Accumulo.
+
+```java
+// Write Spark output to HDFS
+JavaPairRDD<Key, Value> dataToWrite = ... ;
+Job job = Job.getInstance();
+AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job);
+Partitioner partitioner = new AccumuloRangePartitioner("3", "7");
+JavaPairRDD<Key, Value> partData = dataPlus5K.repartitionAndSortWithinPartitions(partitioner);
+partData.saveAsNewAPIHadoopFile(outputDir.toString(), Key.class, Value.class,
+ AccumuloFileOutputFormat.class);
+
+// Bulk import RFiles in HDFS into Accumulo
+try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+ client.tableOperations().importDirectory(outputDir.toString()).to(outputTable).load();
+}
+```
+
+## Reference
+
+* [Spark example] - Accumulo example application that uses Spark to read & write from Accumulo
+* [MapReduce] - Documentation on reading/writing to Accumulo using MapReduce
+* [Apache Spark] - Spark project website
+
+[Apache Spark]: https://spark.apache.org/
+[MapReduce]: {% durl development/mapreduce %}
+[pom.xml]: https://github.com/apache/accumulo-examples/blob/master/pom.xml
+[Spark example]: https://github.com/apache/accumulo-examples/tree/master/spark
+[shaded jar]: https://maven.apache.org/plugins/maven-shade-plugin/index.html
+[AccumuloInputFormat]: {% jurl org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat %}
+[AccumuloFileOutputFormat]: {% jurl org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat %}
+[AccumuloRangePartitioner]: https://github.com/apache/accumulo-examples/blob/master/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java#L44