You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2019/03/28 16:22:27 UTC
[accumulo-examples] branch master updated: Created Accumulo/Spark
example (#39)
This is an automated email from the ASF dual-hosted git repository.
mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-examples.git
The following commit(s) were added to refs/heads/master by this push:
new 8c3264c Created Accumulo/Spark example (#39)
8c3264c is described below
commit 8c3264ce46500ab328297a0122e9ede669980938
Author: Mike Walch <mw...@apache.org>
AuthorDate: Thu Mar 28 12:22:23 2019 -0400
Created Accumulo/Spark example (#39)
---
README.md | 2 +
spark/.gitignore | 6 +
spark/README.md | 45 ++++++
spark/pom.xml | 117 +++++++++++++++
spark/run.sh | 28 ++++
.../java/org/apache/accumulo/spark/CopyPlus5K.java | 157 +++++++++++++++++++++
6 files changed, 355 insertions(+)
diff --git a/README.md b/README.md
index 77c91bc..0be1400 100644
--- a/README.md
+++ b/README.md
@@ -83,6 +83,7 @@ Each example below highlights a feature of Apache Accumulo.
| [rowhash] | Using MapReduce to read a table and write to a new column in the same table. |
| [sample] | Building and using sample data in Accumulo. |
| [shard] | Using the intersecting iterator with a term index partitioned by document. |
+| [spark] | Using Accumulo as input and output for Apache Spark jobs |
| [tabletofile] | Using MapReduce to read a table and write one of its columns to a file in HDFS. |
| [terasort] | Generating random data and sorting it using Accumulo. |
| [uniquecols] | Use MapReduce to count unique columns in Accumulo |
@@ -120,6 +121,7 @@ This repository can be used to test Accumulo release candidates. See
[rowhash]: docs/rowhash.md
[sample]: docs/sample.md
[shard]: docs/shard.md
+[spark]: spark/README.md
[tabletofile]: docs/tabletofile.md
[terasort]: docs/terasort.md
[uniquecols]: docs/uniquecols.md
diff --git a/spark/.gitignore b/spark/.gitignore
new file mode 100644
index 0000000..f534230
--- /dev/null
+++ b/spark/.gitignore
@@ -0,0 +1,6 @@
+/.classpath
+/.project
+/.settings/
+/target/
+/*.iml
+/.idea
diff --git a/spark/README.md b/spark/README.md
new file mode 100644
index 0000000..af19029
--- /dev/null
+++ b/spark/README.md
@@ -0,0 +1,45 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# Apache Accumulo Spark Example
+
+## Requirements
+
+* Accumulo 2.0+
+* Hadoop YARN installed & `HADOOP_CONF_DIR` set in environment
+* Spark installed & `SPARK_HOME` set in environment
+
+## Spark example
+
+The [CopyPlus5K] example will create an Accumulo table called `spark_example_input`
+and write 100 key/value entries into Accumulo with the values `0..99`. It then launches
+a Spark application that does following:
+
+* Read data from `spark_example_input` table using `AccumuloInputFormat`
+* Add 5000 to each value
+* Write the data to a new Accumulo table (called `spark_example_output`) using one of
+ two methods.
+ 1. **Bulk import** - Write data to an RFile in HDFS using `AccumuloFileOutputFormat` and
+ bulk import to Accumulo table
+ 2. **Batchwriter** - Creates a `BatchWriter` in Spark code to write to the table.
+
+This application can be run using the command:
+
+ ./run.sh batch /path/to/accumulo-client.properties
+
+Change `batch` to `bulk` to use Bulk import method.
+
+[CopyPlus5K]: src/main/java/org/apache/accumulo/spark/CopyPlus5K.java
diff --git a/spark/pom.xml b/spark/pom.xml
new file mode 100644
index 0000000..67f5de2
--- /dev/null
+++ b/spark/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>21</version>
+ </parent>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-spark</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <name>Apache Accumulo Spark Example</name>
+ <description>Example Spark Application for Apache Accumulo</description>
+ <properties>
+ <accumulo.version>2.0.0-SNAPSHOT</accumulo.version>
+ <hadoop.version>3.2.0</hadoop.version>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <zookeeper.version>3.4.13</zookeeper.version>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zookeeper.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>${accumulo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-hadoop-mapreduce</artifactId>
+ <version>${accumulo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <version>2.4.0</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>create-shade-jar</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>spark-shade-jar</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <finalName>${project.artifactId}-shaded</finalName>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>shaded</shadedClassifierName>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.accumulo:accumulo-native</exclude>
+ <exclude>org.apache.hadoop:*</exclude>
+ <exclude>org.apache.spark:*</exclude>
+ </excludes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <!-- Required as Accumulo uses a different version than Hadoop -->
+ <pattern>com.google.common</pattern>
+ <shadedPattern>shaded.com.google.common</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/spark/run.sh b/spark/run.sh
new file mode 100755
index 0000000..e1ab9c0
--- /dev/null
+++ b/spark/run.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+
+if [[ -z "$1" || -z "$2" ]]; then
+ echo "Usage: ./run.sh [bulk|batch] /path/to/accumulo-client.properties"
+ exit 1
+fi
+
+JAR=./target/accumulo-spark-shaded.jar
+if [[ ! -f $JAR ]]; then
+ mvn clean package -P create-shade-jar
+fi
+
+if [[ -z "$SPARK_HOME" ]]; then
+ echo "SPARK_HOME must be set!"
+ exit 1
+fi
+
+if [[ -z "$HADOOP_CONF_DIR" ]]; then
+ echo "HADOOP_CONF_DIR must be set!"
+ exit 1
+fi
+
+"$SPARK_HOME"/bin/spark-submit \
+ --class org.apache.accumulo.spark.CopyPlus5K \
+ --master yarn \
+ --deploy-mode client \
+ $JAR \
+ $1 $2
diff --git a/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java b/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java
new file mode 100644
index 0000000..4443a70
--- /dev/null
+++ b/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java
@@ -0,0 +1,157 @@
+package org.apache.accumulo.spark;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class CopyPlus5K {
+
+ public static class AccumuloRangePartitioner extends Partitioner {
+
+ private static final long serialVersionUID = 1L;
+ private List<String> splits;
+
+ AccumuloRangePartitioner(String... listSplits) {
+ this.splits = Arrays.asList(listSplits);
+ }
+
+ @Override
+ public int getPartition(Object o) {
+ int index = Collections.binarySearch(splits, ((Key)o).getRow().toString());
+ index = index < 0 ? (index + 1) * -1 : index;
+ return index;
+ }
+
+ @Override
+ public int numPartitions() {
+ return splits.size() + 1;
+ }
+ }
+
+ private static void cleanupAndCreateTables(Properties props) throws Exception {
+ FileSystem hdfs = FileSystem.get(new Configuration());
+ if (hdfs.exists(rootPath)) {
+ hdfs.delete(rootPath, true);
+ }
+ try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+ if (client.tableOperations().exists(inputTable)) {
+ client.tableOperations().delete(inputTable);
+ }
+ if (client.tableOperations().exists(outputTable)) {
+ client.tableOperations().delete(outputTable);
+ }
+ // Create tables
+ client.tableOperations().create(inputTable);
+ client.tableOperations().create(outputTable);
+
+ // Write data to input table
+ try (BatchWriter bw = client.createBatchWriter(inputTable)) {
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(String.format("%03d", i));
+ m.at().family("cf1").qualifier("cq1").put("" + i);
+ bw.addMutation(m);
+ }
+ }
+ }
+ }
+
+ private static final String inputTable = "spark_example_input";
+ private static final String outputTable = "spark_example_output";
+ private static final Path rootPath = new Path("/spark_example/");
+
+ public static void main(String[] args) throws Exception {
+
+ if ((!args[0].equals("batch") && !args[0].equals("bulk")) || args[1].isEmpty()) {
+ System.out.println("Usage: ./run.sh [batch|bulk] /path/to/accumulo-client.properties");
+ System.exit(1);
+ }
+
+ // Read client properties from file
+ final Properties props = Accumulo.newClientProperties().from(args[1]).build();
+
+ cleanupAndCreateTables(props);
+
+ SparkConf conf = new SparkConf();
+ conf.setAppName("CopyPlus5K");
+ // KryoSerializer is needed for serializing Accumulo Key when partitioning data for bulk import
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(new Class[]{Key.class, Value.class, Properties.class});
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+
+ Job job = Job.getInstance();
+
+ // Read input from Accumulo
+ AccumuloInputFormat.configure().clientProperties(props).table(inputTable).store(job);
+ JavaPairRDD<Key,Value> data = sc.newAPIHadoopRDD(job.getConfiguration(),
+ AccumuloInputFormat.class, Key.class, Value.class);
+
+ // Add 5K to all values
+ JavaPairRDD<Key, Value> dataPlus5K = data.mapValues(v ->
+ new Value("" + (Integer.parseInt(v.toString()) + 5_000)));
+
+ if (args[0].equals("batch")) {
+ // Write output using batch writer
+ dataPlus5K.foreachPartition(iter -> {
+ // Intentionally created an Accumulo client for each partition to avoid attempting to
+ // serialize it and send it to each remote process.
+ try (AccumuloClient client = Accumulo.newClient().from(props).build();
+ BatchWriter bw = client.createBatchWriter(outputTable)) {
+ iter.forEachRemaining(kv -> {
+ Key key = kv._1;
+ Value val = kv._2;
+ Mutation m = new Mutation(key.getRow());
+ m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier())
+ .visibility(key.getColumnVisibility()).timestamp(key.getTimestamp()).put(val);
+ try {
+ bw.addMutation(m);
+ } catch (MutationsRejectedException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+ });
+ } else if (args[0].equals("bulk")) {
+ // Write output using bulk import
+
+ // Create HDFS directory for bulk import
+ FileSystem hdfs = FileSystem.get(new Configuration());
+ hdfs.mkdirs(rootPath);
+ Path outputDir = new Path(rootPath.toString() + "/output");
+
+ // Write Spark output to HDFS
+ AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job);
+ Partitioner partitioner = new AccumuloRangePartitioner("3", "7");
+ JavaPairRDD<Key, Value> partData = dataPlus5K.repartitionAndSortWithinPartitions(partitioner);
+ partData.saveAsNewAPIHadoopFile(outputDir.toString(), Key.class, Value.class,
+ AccumuloFileOutputFormat.class);
+
+ // Bulk import into Accumulo
+ try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+ client.tableOperations().importDirectory(outputDir.toString()).to(outputTable).load();
+ }
+ } else {
+ System.out.println("Unknown method to write output: " + args[0]);
+ System.exit(1);
+ }
+ }
+}