You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/12/22 18:27:11 UTC

incubator-fluo-website git commit: Added post about Spark+Fluo

Repository: incubator-fluo-website
Updated Branches:
  refs/heads/gh-pages a7d68d618 -> fd2d8ec22

Added post about Spark+Fluo


Branch: refs/heads/gh-pages
Commit: fd2d8ec22fbe580a9f4d44bb91d31c7b5b900ed0
Parents: a7d68d6
Author: Keith Turner <>
Authored: Tue Dec 20 18:26:07 2016 -0500
Committer: Keith Turner <>
Committed: Thu Dec 22 13:10:19 2016 -0500

 _config.yml                          |   2 +
 _posts/blog/ | 246 ++++++++++++++++++++++++++++++
 2 files changed, 248 insertions(+)
diff --git a/_config.yml b/_config.yml
index 32e7e38..578863d 100644
--- a/_config.yml
+++ b/_config.yml
@@ -50,6 +50,8 @@ api_base: ""
 api_static: ""
 fluo_api_base: ""
 fluo_api_static: ""
+fluo_recipes_core_static: ""
+fluo_recipes_spark_static: ""
 old_api_base: ""
 old_api_static: ""
diff --git a/_posts/blog/ b/_posts/blog/
new file mode 100644
index 0000000..d494d9b
--- /dev/null
+++ b/_posts/blog/
@@ -0,0 +1,246 @@
+title: "Loading data into Fluo using Apache Spark"
+date: 2016-12-22 11:43:00 +0000
+author: Keith Turner
+reviewers: Mike Walch
+[Apache Spark][spark] can be used to preprocess and load batches of data into Fluo.  For example
+Spark could be used to group data within a batch and then Fluo transactions could load groups of
+related data. This blog post offers some tips to help you get started writing to Fluo from Spark.
+### Executing load transactions in Spark
+Spark automatically serializes Java objects that are needed for remote execution.  When trying to
+use Fluo with Spark its important to understand what will serialize properly and what will not.
+Classes used to load data into Fluo like [FluoClient] and [LoaderExecutor] are not suitable for
+serialization.  These classes may have thread pools, resources in Zookeeper, transactions that are
+committing in the background, etc .  Therefore these classes must be instantiated at each remote process
+Spark creates.  One way to do this is with Spark's `foreachParition` method.  This method will
+execute code locally at each RDD partition. Within each partition, a [LoaderExecutor]
+can be created.  That's what the example below shows. 
+public void dedupeAndLoad(JavaRDD<Document> docRdd, int numPartitions) {  
+  // Remove duplicate documents.
+  docRdd = docRdd.distinct(numPartitions);
+  // Execute load transactions for unique documents.  Iin Java 8 lambda syntax below, 
+  // iter is of type Iterator<String>
+  docRdd.foreachPartition(iter->{
+    // Assume file was submitted with application
+    FluoConfiguration fconf = new FluoConfiguration(new File(""));
+    try(FluoClient client = FluoFactory.newClient(fconf); 
+        LoaderExecutor le = client.newLoaderExecutor())
+    {
+      while(iter.hasNext()) {
+        le.execute(new DocumentLoader(;
+      }
+    }
+  });
+The example above requires that `` is available locally for each
+partition.  This can be accomplished with `--files` option when launching a Spark job.
+spark-submit --class myApp.Load --files <fluo props dir>/ myApp.jar
+If FluoConfiguration were serializable, then Spark could automatically serialize and make a
+FluoConfiguration object available for each partition.  However, FluoConfiguration is not
+serializable as of Fluo 1.0.0.  This will be fixed in future releases of Fluo.  See [#813][fluo-813]
+for details and workarounds for 1.0.0.
+### Initializing Fluo table
+If you have a lot of existing data, then you could use Spark to initialize your Fluo table with
+historical data. There are two general ways to do this.  The simplest way is to use the
+[AccumuloOutputFormat] to write [Mutation] objects to Accumulo.  However, you need to write data
+using the Fluo data format.  Fluo provides an easy way to do this using the [FluoMutationGenerator].  
+A slightly more complex way to initialize a Fluo table is using Accumulo's bulk load mechanism.
+Bulk load is the process of generating Accumulo RFile's containing Key/Values in a Spark job. Those
+files are then loaded into an Accumulo table.   This can be faster, but its more complex because it
+requires the user to properly partition data in their Spark job.  Ideally, these partitions would
+consist of non-overlapping ranges of Accumulo keys with roughly even amounts of data.  The default
+partitioning methods in Spark will not accomplish this.    
+When following the bulk load approach, you would write [Key] and [Value] objects using the
+[AccumuloFileOutputFormat]. Fluo provides the [FluoKeyValueGenerator] to create key/values in the
+Fluo data format.  Fluo Recipes builds on this and provides code that makes it easy to bulk import
+into Accumulo.  The [FluoSparkHelper.bulkImportRcvToFluo()][bi2fluo] method will do the following :
+ * Repartition data using the split points in the Fluo table
+ * Convert data into expected format for a Fluo table
+ * Create an RFile for each partition in a specified temp dir
+ * Bulk import the RFiles into the Fluo table
+The [Webindex] example uses bulk load to initialize its Fluo table using the code in Fluo Recipes.
+Webindex uses multiple [Collision Free Maps][cfm] and initializes them using
+[CollisionFreeMap.getInitializer()][cfminit].  Webindex uses Spark to initialize the Fluo table with
+historical data.  Webindex also uses Spark to execute load transactions in parallel for
+incrementally loading data. 
+### Packaging your code to run in Spark
+One simple way to execute your Spark code is to create a shaded jar.  This shaded jar should contain
+\: Accumulo client code, Fluo client code, Zookeeper client code, and your Application code.  It
+would be best if the shaded jar contained the versions of Accumulo, Fluo, and Zookeeper running on
+the target system.  One way to achieve this goal is to make it easy for users of your Fluo
+application to build the shaded jar themselves.  The examples below shows a simple bash script and
+Maven pom file that achieve this goal.
+There is no need to include Spark code in the shaded jar as this will be provided by the Spark
+runtime environment.   Depending on your Spark environment, Hadoop client code may also be provided.
+Therefore, Hadoop may not need to be included in the shaded jar. One way to exclude these from the
+shaded jars is to make the scope of these dependencies `provided`, which is what the example does.
+You may also want to consider excluding other libraries that are provided in the Spark env like
+Guava, log4j, etc.
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns=""
+  <modelVersion>4.0.0</modelVersion>
+  <groupId></groupId>
+  <artifactId>fluoAppShaded</artifactId>
+  <version>0.0.1-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <name>Shaded Fluo App</name>
+  <properties>
+    <accumulo.version>1.7.2</accumulo.version>
+    <fluo.version>1.0.0-incubating</fluo.version>
+    <zookeeper.version>3.4.9</zookeeper.version>
+  </properties>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <shadedArtifactAttached>true</shadedArtifactAttached>
+              <shadedClassifierName>shaded</shadedClassifierName>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <!--
+       The provided scope is used for dependencies that should not end up in
+       the shaded jar.  The shaded jar is used to run Spark jobs. The Spark 
+       launcher will provided Spark and Hadoop dependencies, so they are not
+       needed in the shaded jar.
+  -->
+  <dependencies>
+    <!-- The dependency on your Fluo application code.  Version of your app could be made configurable. -->
+    <dependency>
+      <groupId></groupId>
+      <artifactId>fluoApp</artifactId>
+      <version>1.2.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.fluo</groupId>
+      <artifactId>fluo-api</artifactId>
+      <version>${fluo.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.fluo</groupId>
+      <artifactId>fluo-core</artifactId>
+      <version>${fluo.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
+      <version>${accumulo.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>2.7.2</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_2.10</artifactId>
+      <version>1.6.2</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+The following bash script can use the pom above to build a shaded jar.
+# Get the versions of Accumulo and Fluo running on the system.  Could let the
+# user of your Fluo application configure this and have this script read that
+# config.
+ACCUMULO_VERSION=`accumulo version`
+FLUO_VERSION=`fluo version`
+# Could not find an easy way to get zookeeper version automatically
+ZOOKEEPER_VERSION=`echo status | nc $ZOOKEEPER_SERVER 2181 | grep version: | sed 's/.*version: \([0-9.]*\).*/\1/'`
+# Build the shaded jar
+mvn package -Daccumulo.version=$ACCUMULO_VERSION \
+            -Dfluo.version=$FLUO_VERSION \
+            -Dzookeeper.version=$ZOOKEEPER_VERSION
+There are other possible ways to package and run your Fluo application for Spark.  This section
+suggested one possible way.  The core concept of this method is late binding of the Accumulo, Fluo,
+Hadoop, Spark, and Zookeeper libraries.  When choosing a method to create a shaded jar, the
+implications of early vs late binding is something to consider.
+[FluoClient]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/client/FluoClient.html
+[LoaderExecutor]: {{ site.fluo_api_static }}/{{ site.latest_fluo_release }}/org/apache/fluo/api/client/LoaderExecutor.html
+[bi2fluo]:{{ site.fluo_recipes_spark_static }}/{{ site.latest_recipes_release }}/org/apache/fluo/recipes/spark/
+[cfminit]:{{ site.fluo_recipes_core_static }}/{{ site.latest_recipes_release }}/org/apache/fluo/recipes/core/map/
+[cfm]: /docs/fluo-recipes/{{ site.latest_recipes_release }}/cfm/