You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/06/07 08:44:48 UTC

camel git commit: Added camel-spark docs to Gitbook

Repository: camel
Updated Branches:
  refs/heads/master 8049c6af3 -> 00f72a29d


Added camel-spark docs to Gitbook


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/00f72a29
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/00f72a29
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/00f72a29

Branch: refs/heads/master
Commit: 00f72a29d5653e487dd21011fa96914261b9eb71
Parents: 8049c6a
Author: Andrea Cosentino <an...@gmail.com>
Authored: Tue Jun 7 10:44:11 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Tue Jun 7 10:44:11 2016 +0200

----------------------------------------------------------------------
 components/camel-spark/src/main/docs/spark.adoc | 413 +++++++++++++++++++
 docs/user-manual/en/SUMMARY.md                  |   1 +
 .../apache-spark.data/camel_spark_cluster.png   | Bin 0 -> 172359 bytes
 .../en/apache-spark.data/camel_spark_driver.png | Bin 0 -> 293927 bytes
 .../en/apache-spark.data/fabric_docker-(2).png  | Bin 0 -> 37481 bytes
 5 files changed, 414 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/00f72a29/components/camel-spark/src/main/docs/spark.adoc
----------------------------------------------------------------------
diff --git a/components/camel-spark/src/main/docs/spark.adoc b/components/camel-spark/src/main/docs/spark.adoc
new file mode 100644
index 0000000..6620eb1
--- /dev/null
+++ b/components/camel-spark/src/main/docs/spark.adoc
@@ -0,0 +1,413 @@
+[[ApacheSpark-ApacheSparkcomponent]]
+Apache Spark component
+~~~~~~~~~~~~~~~~~~~~~~
+
+INFO: Apache Spark component is available starting from Camel *2.17*.
+
+This documentation page covers the http://spark.apache.org/[Apache
+Spark] component for the Apache Camel. The main purpose of the Spark
+integration with Camel is to provide a bridge between Camel connectors
+and Spark tasks. In particular Camel connector provides a way to route
+message from various transports, dynamically choose a task to execute,
+use incoming message as input data for that task and finally deliver the
+results of the execution back to the Camel pipeline.
+
+[[ApacheSpark-Supportedarchitecturalstyles]]
+Supported architectural styles
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Spark component can be used as a driver application deployed into an
+application server (or executed as a fat jar).
+
+image:apache-spark.data/camel_spark_driver.png[image] +
+
+Spark component can also be submitted as a job directly into the Spark
+cluster.
+
+image:apache-spark.data/camel_spark_cluster.png[image] +
+
+While Spark component is primary designed to work as a _long running
+job_�serving as an bridge between Spark cluster and the other endpoints,
+you can also use it as a _fire-once_ short job. ��
+
+[[ApacheSpark-RunningSparkinOSGiservers]]
+Running Spark in OSGi servers
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Currently the Spark component doesn't support execution in the OSGi
+container. Spark has been designed to be executed as a fat jar, usually
+submitted as a job to a cluster. For those reasons running Spark in an
+OSGi server is at least challenging and is not support by Camel as well.
+
+[[ApacheSpark-URIformat]]
+URI format
+^^^^^^^^^^
+
+Currently the Spark component supports only producers - it it intended
+to invoke a Spark job and return results. You can call RDD, data frame
+or Hive SQL job.
+
+*Spark URI format*
+
+[source,java]
+--------------------------
+spark:{rdd|dataframe|hive}
+--------------------------
+
+[[ApacheSpark-options]]
+Spark options
++++++++++++++
+
+
+// component options: START
+The Apache Spark component supports 2 options which are listed below.
+
+
+
+{% raw %}
+[width="100%",cols="2s,1m,8",options="header"]
+|=======================================================================
+| Name | Java Type | Description
+| rdd | JavaRDDLike | RDD to compute against.
+| rddCallback | RddCallback | Function performing action against an RDD.
+|=======================================================================
+{% endraw %}
+// component options: END
+
+
+
+// endpoint options: START
+The Apache Spark component supports 8 endpoint options which are listed below:
+
+{% raw %}
+[width="100%",cols="2s,1,1m,1m,5",options="header"]
+|=======================================================================
+| Name | Group | Default | Java Type | Description
+| endpointType | producer |  | EndpointType | *Required* Type of the endpoint (rdd dataframe hive).
+| collect | producer | true | boolean | Indicates if results should be collected or counted.
+| dataFrame | producer |  | DataFrame | DataFrame to compute against.
+| dataFrameCallback | producer |  | DataFrameCallback | Function performing action against an DataFrame.
+| rdd | producer |  | JavaRDDLike | RDD to compute against.
+| rddCallback | producer |  | RddCallback | Function performing action against an RDD.
+| exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
+| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
+|=======================================================================
+{% endraw %}
+// endpoint options: END
+
+�
+[[ApacheSpark-RDDjobs]]
+RDD jobs�
+^^^^^^^^^
+
+To invoke an RDD job, use the following URI:
+
+*Spark RDD producer*
+
+[source,java]
+------------------------------------------------------
+spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
+------------------------------------------------------
+
+�Where `rdd` option refers to the name of an RDD instance (subclass of
+`org.apache.spark.api.java.JavaRDDLike`) from a Camel registry, while
+`rddCallback` refers to the implementation
+of�`org.apache.camel.component.spark.RddCallback` interface (also from a
+registry). RDD callback provides a single method used to apply incoming
+messages against the given RDD. Results of callback computations are
+saved as a body to an exchange.
+
+*Spark RDD callback*
+
+[source,java]
+-------------------------------------------------
+public interface RddCallback<T> {
+    T onRdd(JavaRDDLike rdd, Object... payloads);
+}
+-------------------------------------------------
+
+The following snippet demonstrates how to send message as an input to
+the job and return results:
+
+*Calling spark job*
+
+[source,java]
+------------------------------------------------------------------------------------------------------------------------------
+String pattern = "job input";
+long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
+------------------------------------------------------------------------------------------------------------------------------
+
+The RDD callback for the snippet above registered as Spring bean could
+look as follows:
+
+*Spark RDD callback*
+
+[source,java]
+------------------------------------------------------------------------
+@Bean
+RddCallback<Long> countLinesContaining() {
+    return new RddCallback<Long>() {
+        Long onRdd(JavaRDDLike rdd, Object... payloads) {
+            String pattern = (String) payloads[0];
+            return rdd.filter({line -> line.contains(pattern)}).count();
+        }
+    }
+}
+------------------------------------------------------------------------
+
+The RDD definition in Spring could looks as follows:
+
+*Spark RDD definition*
+
+[source,java]
+--------------------------------------------------
+@Bean
+JavaRDDLike myRdd(JavaSparkContext sparkContext) {
+  return sparkContext.textFile("testrdd.txt");
+}
+--------------------------------------------------
+
+[[ApacheSpark-VoidRDDcallbacks]]
+Void RDD callbacks
+++++++++++++++++++
+
+If your RDD callback doesn't return any value back to a Camel pipeline,
+you can either return `null` value or use�`VoidRddCallback` base class:
+
+*Spark RDD definition*
+
+[source,java]
+------------------------------------------------------------------
+@Bean
+RddCallback<Void> rddCallback() {
+  return new VoidRddCallback() {
+        @Override
+        public void doOnRdd(JavaRDDLike rdd, Object... payloads) {
+            rdd.saveAsTextFile(output.getAbsolutePath());
+        }
+    };
+}
+------------------------------------------------------------------
+
+[[ApacheSpark-ConvertingRDDcallbacks]]
+Converting RDD callbacks
+++++++++++++++++++++++++
+
+If you know what type of the input data will be sent to the RDD
+callback, you can use�`ConvertingRddCallback` and let Camel to
+automatically convert incoming messages before inserting those into the
+callback:
+
+*Spark RDD definition*
+
+[source,java]
+---------------------------------------------------------------------------
+@Bean
+RddCallback<Long> rddCallback(CamelContext context) {
+  return new ConvertingRddCallback<Long>(context, int.class, int.class) {
+            @Override
+            public Long doOnRdd(JavaRDDLike rdd, Object... payloads) {
+                return rdd.count() * (int) payloads[0] * (int) payloads[1];
+            }
+        };
+    };
+}
+---------------------------------------------------------------------------
+
+[[ApacheSpark-AnnotatedRDDcallbacks]]
+Annotated RDD callbacks
++++++++++++++++++++++++
+
+Probably the easiest way to work with the RDD callbacks is to provide
+class with method marked with�`@RddCallback` annotation:
+
+*Annotated RDD callback definition*
+
+[source,java]
+-----------------------------------------------------------------------------------------------------
+import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
+�
+@Bean
+RddCallback<Long> rddCallback() {
+    return annotatedRddCallback(new MyTransformation());
+}
+�
+...
+�
+import org.apache.camel.component.spark.annotation.RddCallback;
+�
+public class MyTransformation {
+�
+    @RddCallback
+    long countLines(JavaRDD<String> textFile, int first, int second) {
+        return textFile.count() * first * second;
+    }
+�
+}
+-----------------------------------------------------------------------------------------------------
+
+If you will pass CamelContext to the annotated RDD callback factory
+method, the created callback will be able to convert incoming payloads
+to match the parameters of the annotated method:
+
+*Body conversions for annotated RDD callbacks*
+
+[source,java]
+------------------------------------------------------------------------------------------------------------------------------
+import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
+�
+@Bean
+RddCallback<Long> rddCallback(CamelContext camelContext) {
+    return annotatedRddCallback(new MyTransformation(), camelContext);
+}
+�
+...
+
+�
+import org.apache.camel.component.spark.annotation.RddCallback;
+�
+public class MyTransformation {
+�
+    @RddCallback
+    long countLines(JavaRDD<String> textFile, int first, int second) {
+        return textFile.count() * first * second;
+    }
+�
+}
+�
+...
+�
+// Convert String "10" to integer
+long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);
+------------------------------------------------------------------------------------------------------------------------------
+
+�
+
+[[ApacheSpark-DataFramejobs]]
+DataFrame jobs
+^^^^^^^^^^^^^^
+
+Instead of working with RDDs Spark component can work with DataFrames as
+well.�
+
+To invoke an DataFrame job, use the following URI:
+
+*Spark RDD producer*
+
+[source,java]
+--------------------------------------------------------------------------
+spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation
+--------------------------------------------------------------------------
+
+�Where�`dataFrame`�option refers to the name of an DataFrame instance
+(`instance of of�org.apache.spark.sql.DataFrame`) from a Camel registry,
+while�`dataFrameCallback`�refers to the implementation
+of�`org.apache.camel.component.spark.DataFrameCallback`�interface (also
+from a registry). DataFrame callback provides a single method used to
+apply incoming messages against the given DataFrame. Results of callback
+computations are saved as a body to an exchange.
+
+*Spark RDD callback*
+
+[source,java]
+-----------------------------------------------------------
+public interface DataFrameCallback<T> {
+    T onDataFrame(DataFrame dataFrame, Object... payloads);
+}
+-----------------------------------------------------------
+
+The following snippet demonstrates how to send message as an input to a
+job and return results:
+
+*Calling spark job*
+
+[source,java]
+-----------------------------------------------------------------------------------------------------------------------------------------
+String model = "Micra";
+long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class);
+-----------------------------------------------------------------------------------------------------------------------------------------
+
+The DataFrame callback for the snippet above registered as Spring bean
+could look as follows:
+
+*Spark RDD callback*
+
+[source,java]
+-------------------------------------------------------------------------------------
+@Bean
+RddCallback<Long> findCarWithModel() {
+    return new DataFrameCallback<Long>() {
+        @Override
+        public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
+            String model = (String) payloads[0];
+            return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count();
+        }
+    };
+}
+-------------------------------------------------------------------------------------
+
+The DataFrame definition in Spring could looks as follows:
+
+*Spark RDD definition*
+
+[source,java]
+------------------------------------------------------------------------
+@Bean
+DataFrame cars(HiveContext hiveContext) {
+    DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
+    jsonCars.registerTempTable("cars");
+    return jsonCars;
+}
+------------------------------------------------------------------------
+
+[[ApacheSpark-Hivejobs]]
+Hive jobs
+^^^^^^^^^
+
+�Instead of working with RDDs or DataFrame Spark component can also
+receive Hive SQL queries as payloads.�To send Hive query to Spark
+component, use the following URI:
+
+*Spark RDD producer*
+
+[source,java]
+----------
+spark:hive
+----------
+
+The following snippet demonstrates how to send message as an input to a
+job and return results:
+
+*Calling spark job*
+
+[source,java]
+----------------------------------------------------------------------------------------------------
+long carsCount = template.requestBody("spark:hive?collect=false", "SELECT * FROM cars", Long.class);
+List<Row> cars = template.requestBody("spark:hive", "SELECT * FROM cars", List.class);
+----------------------------------------------------------------------------------------------------
+
+The table we want to execute query against should be registered in a
+HiveContext before we query it. For example in Spring such registration
+could look as follows:
+
+*Spark RDD definition*
+
+[source,java]
+------------------------------------------------------------------------
+@Bean
+DataFrame cars(HiveContext hiveContext) {
+    DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
+    jsonCars.registerTempTable("cars");
+    return jsonCars;
+}
+------------------------------------------------------------------------
+
+[[ApacheSpark-SeeAlso]]
+See Also
+^^^^^^^^
+
+* link:configuring-camel.html[Configuring Camel]
+* link:component.html[Component]
+* link:endpoint.html[Endpoint]
+* link:getting-started.html[Getting Started]
+

http://git-wip-us.apache.org/repos/asf/camel/blob/00f72a29/docs/user-manual/en/SUMMARY.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md
index 2226bea..7c52fbd 100644
--- a/docs/user-manual/en/SUMMARY.md
+++ b/docs/user-manual/en/SUMMARY.md
@@ -242,6 +242,7 @@
     * [SMPP](smpp.adoc)
     * [SNMP](snmp.adoc)
     * [Solr](solr.adoc)
+    * [Spark](spark.adoc)
     * [Telegram](telegram.adoc)
     * [Twitter](twitter.adoc)
     * [Websocket](websocket.adoc)

http://git-wip-us.apache.org/repos/asf/camel/blob/00f72a29/docs/user-manual/en/apache-spark.data/camel_spark_cluster.png
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/apache-spark.data/camel_spark_cluster.png b/docs/user-manual/en/apache-spark.data/camel_spark_cluster.png
new file mode 100644
index 0000000..c550bf5
Binary files /dev/null and b/docs/user-manual/en/apache-spark.data/camel_spark_cluster.png differ

http://git-wip-us.apache.org/repos/asf/camel/blob/00f72a29/docs/user-manual/en/apache-spark.data/camel_spark_driver.png
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/apache-spark.data/camel_spark_driver.png b/docs/user-manual/en/apache-spark.data/camel_spark_driver.png
new file mode 100644
index 0000000..17b6b67
Binary files /dev/null and b/docs/user-manual/en/apache-spark.data/camel_spark_driver.png differ

http://git-wip-us.apache.org/repos/asf/camel/blob/00f72a29/docs/user-manual/en/apache-spark.data/fabric_docker-(2).png
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/apache-spark.data/fabric_docker-(2).png b/docs/user-manual/en/apache-spark.data/fabric_docker-(2).png
new file mode 100644
index 0000000..5b0fba5
Binary files /dev/null and b/docs/user-manual/en/apache-spark.data/fabric_docker-(2).png differ