You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/12/29 22:03:38 UTC

spark git commit: [SPARK-19003][DOCS] Add Java example in Spark Streaming Guide, section Design Patterns for using foreachRDD

Repository: spark
Updated Branches:
  refs/heads/master 87bc4112c -> dba81e1dc


[SPARK-19003][DOCS] Add Java example in Spark Streaming Guide, section Design Patterns for using foreachRDD

## What changes were proposed in this pull request?

Added missing Java example under section "Design Patterns for using foreachRDD". Now this section has examples in all 3 languages, improving consistency of documentation.

## How was this patch tested?

Manual.
Generated docs using command "SKIP_API=1 jekyll build" and verified generated HTML page manually.

The syntax of example has been tested for correctness using sample code on Java1.7 and Spark 2.2.0-SNAPSHOT.

Author: adesharatushar <tu...@persistent.com>

Closes #16408 from adesharatushar/streaming-doc-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba81e1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba81e1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba81e1d

Branch: refs/heads/master
Commit: dba81e1dcdea1e8bd196c88d4810f9a04312acbf
Parents: 87bc411
Author: adesharatushar <tu...@persistent.com>
Authored: Thu Dec 29 22:03:34 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Dec 29 22:03:34 2016 +0000

----------------------------------------------------------------------
 docs/streaming-programming-guide.md | 72 ++++++++++++++++++++++++++++++++
 1 file changed, 72 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dba81e1d/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 1fcd198..38b4f78 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1246,6 +1246,22 @@ dstream.foreachRDD { rdd =>
 }
 {% endhighlight %}
 </div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+  @Override
+  public void call(JavaRDD<String> rdd) {
+    final Connection connection = createNewConnection(); // executed at the driver
+    rdd.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String record) {
+        connection.send(record); // executed at the worker
+      }
+    });
+  }
+});
+{% endhighlight %}
+</div>
 <div data-lang="python" markdown="1">
 {% highlight python %}
 def sendRecord(rdd):
@@ -1279,6 +1295,23 @@ dstream.foreachRDD { rdd =>
 }
 {% endhighlight %}
 </div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+  @Override
+  public void call(JavaRDD<String> rdd) {
+    rdd.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String record) {
+        Connection connection = createNewConnection();
+        connection.send(record);
+        connection.close();
+      }
+    });
+  }
+});
+{% endhighlight %}
+</div>
 <div data-lang="python" markdown="1">
 {% highlight python %}
 def sendRecord(record):
@@ -1309,6 +1342,25 @@ dstream.foreachRDD { rdd =>
 }
 {% endhighlight %}
 </div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+  @Override
+  public void call(JavaRDD<String> rdd) {
+    rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
+      @Override
+      public void call(Iterator<String> partitionOfRecords) {
+        Connection connection = createNewConnection();
+        while (partitionOfRecords.hasNext()) {
+          connection.send(partitionOfRecords.next());
+        }
+        connection.close();
+      }
+    });
+  }
+});
+{% endhighlight %}
+</div>
 <div data-lang="python" markdown="1">
 {% highlight python %}
 def sendPartition(iter):
@@ -1342,6 +1394,26 @@ dstream.foreachRDD { rdd =>
 {% endhighlight %}
 </div>
 
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+  @Override
+  public void call(JavaRDD<String> rdd) {
+    rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
+      @Override
+      public void call(Iterator<String> partitionOfRecords) {
+        // ConnectionPool is a static, lazily initialized pool of connections
+        Connection connection = ConnectionPool.getConnection();
+        while (partitionOfRecords.hasNext()) {
+          connection.send(partitionOfRecords.next());
+        }
+        ConnectionPool.returnConnection(connection); // return to the pool for future reuse
+      }
+    });
+  }
+});
+{% endhighlight %}
+</div>
 <div data-lang="python" markdown="1">
 {% highlight python %}
 def sendPartition(iter):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org