You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2019/04/24 21:49:43 UTC

[accumulo-website] branch master updated: Created docs for using Apache Spark with Accumulo (#171)

This is an automated email from the ASF dual-hosted git repository.

mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 858470e  Created docs for using Apache Spark with Accumulo (#171)
858470e is described below

commit 858470eece88756783aa5659cafafe5e0e92ba97
Author: Mike Walch <mw...@apache.org>
AuthorDate: Wed Apr 24 17:49:39 2019 -0400

    Created docs for using Apache Spark with Accumulo (#171)
---
 _docs-2/development/proxy.md |   2 +-
 _docs-2/development/spark.md | 109 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 110 insertions(+), 1 deletion(-)

diff --git a/_docs-2/development/proxy.md b/_docs-2/development/proxy.md
index 1a5f598..9bd5a1c 100644
--- a/_docs-2/development/proxy.md
+++ b/_docs-2/development/proxy.md
@@ -1,7 +1,7 @@
 ---
 title: Proxy
 category: development
-order: 3
+order: 7
 ---
 
 The proxy API allows the interaction with Accumulo with languages other than Java.
diff --git a/_docs-2/development/spark.md b/_docs-2/development/spark.md
new file mode 100644
index 0000000..e1bb251
--- /dev/null
+++ b/_docs-2/development/spark.md
@@ -0,0 +1,109 @@
+---
+title: Spark
+category: development
+order: 3
+---
+
+[Apache Spark] applications can read and write from Accumulo tables.
+
+Before reading this documentation, it may help to review the [MapReduce]
+documentation as API created for MapReduce jobs is used by Spark.
+
+This documentation references code from the Accumulo [Spark example].
+
+## General configuration
+
+1. Create a [shaded jar] with your Spark code and all of your dependencies (excluding
+   Spark and Hadoop). When creating the shaded jar, you should relocate Guava
+   as Accumulo uses a different version. The [pom.xml] in the [Spark example] is
+   a good reference and can be used a a starting point for a Spark application.
+
+2. Submit the job by running `spark-submit` with your shaded jar. You should pass
+   in the location of your `accumulo-client.properties` that will be used to connect
+   to your Accumulo instance.
+    ```bash
+    $SPARK_HOME/bin/spark-submit \
+      --class com.my.spark.job.MainClass \
+      --master yarn \
+      --deploy-mode client \
+      /path/to/spark-job-shaded.jar \
+      /path/to/accumulo-client.properties
+    ```
+
+## Reading from Accumulo table
+
+Apache Spark can read from an Accumulo table by using [AccumuloInputFormat].
+
+```java
+Job job = Job.getInstance();
+AccumuloInputFormat.configure().clientProperties(props).table(inputTable).store(job);
+JavaPairRDD<Key,Value> data = sc.newAPIHadoopRDD(job.getConfiguration(),
+    AccumuloInputFormat.class, Key.class, Value.class);
+```
+
+## Writing to Accumulo table
+
+There are two ways to write an Accumulo table.
+
+### Use a BatchWriter
+
+Write your data to Accumulo by creating an AccumuloClient for each partition and writing all
+data in the partition using a BatchWriter.
+
+```java
+// Spark will automatically serialize this properties object and send it to each partition
+Properties props = Accumulo.newClientProperties()
+                    .from("/path/to/accumulo-client.properties").build();
+JavaPairRDD<Key, Value> dataToWrite = ... ;
+dataToWrite.foreachPartition(iter -> {
+  // Create client inside partition so that Spark does not attempt to serialize it.
+  try (AccumuloClient client = Accumulo.newClient().from(props).build();
+       BatchWriter bw = client.createBatchWriter(outputTable)) {
+    iter.forEachRemaining(kv -> {
+      Key key = kv._1;
+      Value val = kv._2;
+      Mutation m = new Mutation(key.getRow());
+      m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier())
+          .visibility(key.getColumnVisibility()).timestamp(key.getTimestamp()).put(val);
+      bw.addMutation(m);
+    });
+  }
+});
+```
+
+### Using Bulk Import
+
+Partition your data and write it to RFiles. The [AccumuloRangePartitioner] found in the Accumulo
+Spark example can be used for partitioning data. After your data has been written to an output
+directory using [AccumuloFileOutputFormat] as RFiles, bulk import this directory into Accumulo.
+
+```java
+// Write Spark output to HDFS
+JavaPairRDD<Key, Value> dataToWrite = ... ;
+Job job = Job.getInstance();
+AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job);
+Partitioner partitioner = new AccumuloRangePartitioner("3", "7");
+JavaPairRDD<Key, Value> partData = dataPlus5K.repartitionAndSortWithinPartitions(partitioner);
+partData.saveAsNewAPIHadoopFile(outputDir.toString(), Key.class, Value.class,
+    AccumuloFileOutputFormat.class);
+
+// Bulk import RFiles in HDFS into Accumulo
+try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+  client.tableOperations().importDirectory(outputDir.toString()).to(outputTable).load();
+}
+```
+
+## Reference
+
+* [Spark example] - Accumulo example application that uses Spark to read & write from Accumulo
+* [MapReduce] - Documentation on reading/writing to Accumulo using MapReduce
+* [Apache Spark] - Spark project website
+
+[Apache Spark]: https://spark.apache.org/
+[MapReduce]: {% durl development/mapreduce %}
+[pom.xml]: https://github.com/apache/accumulo-examples/blob/master/pom.xml
+[Spark example]: https://github.com/apache/accumulo-examples/tree/master/spark
+[shaded jar]: https://maven.apache.org/plugins/maven-shade-plugin/index.html
+[AccumuloInputFormat]: {% jurl org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat %}
+[AccumuloFileOutputFormat]: {% jurl org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat %}
+[AccumuloRangePartitioner]: https://github.com/apache/accumulo-examples/blob/master/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java#L44