You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/12/23 01:39:14 UTC

spark git commit: [SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming

Repository: spark
Updated Branches:
  refs/heads/master 93db50d1c -> 20591afd7


[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for Streaming

This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #10385 from zsxwing/accumulator-broadcast-example.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20591afd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20591afd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20591afd

Branch: refs/heads/master
Commit: 20591afd790799327f99485c5a969ed7412eca45
Parents: 93db50d
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Dec 22 16:39:10 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Dec 22 16:39:10 2015 -0800

----------------------------------------------------------------------
 docs/programming-guide.md                       |   6 +-
 docs/streaming-programming-guide.md             | 165 +++++++++++++++++++
 .../JavaRecoverableNetworkWordCount.java        |  71 +++++++-
 .../streaming/recoverable_network_wordcount.py  |  30 +++-
 .../streaming/RecoverableNetworkWordCount.scala |  66 +++++++-
 5 files changed, 325 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20591afd/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index c5e2a1c..bad25e6 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may
 
 What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.  
 
-To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.  
+To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.  
 
 In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
 
@@ -1091,7 +1091,7 @@ for details.
 </tr>
 <tr>
   <td> <b>foreach</b>(<i>func</i>) </td>
-  <td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#AccumLink">Accumulator</a> or interacting with external storage systems.
+  <td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#accumulators">Accumulator</a> or interacting with external storage systems.
   <br /><b>Note</b>: modifying variables other than Accumulators outside of the <code>foreach()</code> may result in undefined behavior. See <a href="#ClosuresLink">Understanding closures </a> for more details.</td>
 </tr>
 </table>
@@ -1338,7 +1338,7 @@ run on the cluster so that `v` is not shipped to the nodes more than once. In ad
 `v` should not be modified after it is broadcast in order to ensure that all nodes get the same
 value of the broadcast variable (e.g. if the variable is shipped to a new node later).
 
-## Accumulators <a name="AccumLink"></a>
+## Accumulators
 
 Accumulators are variables that are only "added" to through an associative operation and can
 therefore be efficiently supported in parallel. They can be used to implement counters (as in

http://git-wip-us.apache.org/repos/asf/spark/blob/20591afd/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index ed6b28c..3b071c7 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily created on demand and tim
 
 ***
 
+## Accumulators and Broadcast Variables
+
+[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+object WordBlacklist {
+
+  @volatile private var instance: Broadcast[Seq[String]] = null
+
+  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          val wordBlacklist = Seq("a", "b", "c")
+          instance = sc.broadcast(wordBlacklist)
+        }
+      }
+    }
+    instance
+  }
+}
+
+object DroppedWordsCounter {
+
+  @volatile private var instance: Accumulator[Long] = null
+
+  def getInstance(sc: SparkContext): Accumulator[Long] = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+        }
+      }
+    }
+    instance
+  }
+}
+
+wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+  // Get or register the blacklist Broadcast
+  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
+  // Get or register the droppedWordsCounter Accumulator
+  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
+  // Use blacklist to drop words and use droppedWordsCounter to count them
+  val counts = rdd.filter { case (word, count) =>
+    if (blacklist.value.contains(word)) {
+      droppedWordsCounter += count
+      false
+    } else {
+      true
+    }
+  }.collect()
+  val output = "Counts at time " + time + " " + counts
+})
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class JavaWordBlacklist {
+
+  private static volatile Broadcast<List<String>> instance = null;
+
+  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (JavaWordBlacklist.class) {
+        if (instance == null) {
+          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
+          instance = jsc.broadcast(wordBlacklist);
+        }
+      }
+    }
+    return instance;
+  }
+}
+
+class JavaDroppedWordsCounter {
+
+  private static volatile Accumulator<Integer> instance = null;
+
+  public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (JavaDroppedWordsCounter.class) {
+        if (instance == null) {
+          instance = jsc.accumulator(0, "WordsInBlacklistCounter");
+        }
+      }
+    }
+    return instance;
+  }
+}
+
+wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
+  @Override
+  public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
+    // Get or register the blacklist Broadcast
+    final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+    // Get or register the droppedWordsCounter Accumulator
+    final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+    // Use blacklist to drop words and use droppedWordsCounter to count them
+    String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
+      @Override
+      public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
+        if (blacklist.value().contains(wordCount._1())) {
+          droppedWordsCounter.add(wordCount._2());
+          return false;
+        } else {
+          return true;
+        }
+      }
+    }).collect().toString();
+    String output = "Counts at time " + time + " " + counts;
+  }
+}
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+def getWordBlacklist(sparkContext):
+    if ('wordBlacklist' not in globals()):
+        globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
+    return globals()['wordBlacklist']
+
+def getDroppedWordsCounter(sparkContext):
+    if ('droppedWordsCounter' not in globals()):
+        globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
+    return globals()['droppedWordsCounter']
+
+def echo(time, rdd):
+    # Get or register the blacklist Broadcast
+    blacklist = getWordBlacklist(rdd.context)
+    # Get or register the droppedWordsCounter Accumulator
+    droppedWordsCounter = getDroppedWordsCounter(rdd.context)
+
+    # Use blacklist to drop words and use droppedWordsCounter to count them
+    def filterFunc(wordCount):
+        if wordCount[0] in blacklist.value:
+            droppedWordsCounter.add(wordCount[1])
+            False
+        else:
+            True
+
+    counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
+
+wordCounts.foreachRDD(echo)
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
+
+</div>
+</div>
+
+***
+
 ## DataFrame and SQL Operations
 You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20591afd/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index bceda97..90d4737 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -21,17 +21,22 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.List;
 import java.util.regex.Pattern;
 
 import scala.Tuple2;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
+import org.apache.spark.Accumulator;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.Time;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -41,7 +46,48 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
 
 /**
- * Counts words in text encoded with UTF8 received from the network every second.
+ * Use this singleton to get or register a Broadcast variable.
+ */
+class JavaWordBlacklist {
+
+  private static volatile Broadcast<List<String>> instance = null;
+
+  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (JavaWordBlacklist.class) {
+        if (instance == null) {
+          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
+          instance = jsc.broadcast(wordBlacklist);
+        }
+      }
+    }
+    return instance;
+  }
+}
+
+/**
+ * Use this singleton to get or register an Accumulator.
+ */
+class JavaDroppedWordsCounter {
+
+  private static volatile Accumulator<Integer> instance = null;
+
+  public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (JavaDroppedWordsCounter.class) {
+        if (instance == null) {
+          instance = jsc.accumulator(0, "WordsInBlacklistCounter");
+        }
+      }
+    }
+    return instance;
+  }
+}
+
+/**
+ * Counts words in text encoded with UTF8 received from the network every second. This example also
+ * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
+ * they can be registered on driver failures.
  *
  * Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
  *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
@@ -111,10 +157,27 @@ public final class JavaRecoverableNetworkWordCount {
     wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
       @Override
       public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
-        String counts = "Counts at time " + time + " " + rdd.collect();
-        System.out.println(counts);
+        // Get or register the blacklist Broadcast
+        final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+        // Get or register the droppedWordsCounter Accumulator
+        final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+        // Use blacklist to drop words and use droppedWordsCounter to count them
+        String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
+          @Override
+          public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
+            if (blacklist.value().contains(wordCount._1())) {
+              droppedWordsCounter.add(wordCount._2());
+              return false;
+            } else {
+              return true;
+            }
+          }
+        }).collect().toString();
+        String output = "Counts at time " + time + " " + counts;
+        System.out.println(output);
+        System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
         System.out.println("Appending to " + outputFile.getAbsolutePath());
-        Files.append(counts + "\n", outputFile, Charset.defaultCharset());
+        Files.append(output + "\n", outputFile, Charset.defaultCharset());
         return null;
       }
     });

http://git-wip-us.apache.org/repos/asf/spark/blob/20591afd/examples/src/main/python/streaming/recoverable_network_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py
index ac91f0a..52b2639 100644
--- a/examples/src/main/python/streaming/recoverable_network_wordcount.py
+++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py
@@ -44,6 +44,20 @@ from pyspark import SparkContext
 from pyspark.streaming import StreamingContext
 
 
+# Get or register a Broadcast variable
+def getWordBlacklist(sparkContext):
+    if ('wordBlacklist' not in globals()):
+        globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
+    return globals()['wordBlacklist']
+
+
+# Get or register an Accumulator
+def getDroppedWordsCounter(sparkContext):
+    if ('droppedWordsCounter' not in globals()):
+        globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
+    return globals()['droppedWordsCounter']
+
+
 def createContext(host, port, outputPath):
     # If you do not see this printed, that means the StreamingContext has been loaded
     # from the new checkpoint
@@ -60,8 +74,22 @@ def createContext(host, port, outputPath):
     wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
 
     def echo(time, rdd):
-        counts = "Counts at time %s %s" % (time, rdd.collect())
+        # Get or register the blacklist Broadcast
+        blacklist = getWordBlacklist(rdd.context)
+        # Get or register the droppedWordsCounter Accumulator
+        droppedWordsCounter = getDroppedWordsCounter(rdd.context)
+
+        # Use blacklist to drop words and use droppedWordsCounter to count them
+        def filterFunc(wordCount):
+            if wordCount[0] in blacklist.value:
+                droppedWordsCounter.add(wordCount[1])
+                False
+            else:
+                True
+
+        counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
         print(counts)
+        print("Dropped %d word(s) totally" % droppedWordsCounter.value)
         print("Appending to " + os.path.abspath(outputPath))
         with open(outputPath, 'a') as f:
             f.write(counts + "\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/20591afd/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 9916882..38d4fd1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -23,13 +23,55 @@ import java.nio.charset.Charset
 
 import com.google.common.io.Files
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{Accumulator, SparkConf, SparkContext}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
 import org.apache.spark.util.IntParam
 
 /**
- * Counts words in text encoded with UTF8 received from the network every second.
+ * Use this singleton to get or register a Broadcast variable.
+ */
+object WordBlacklist {
+
+  @volatile private var instance: Broadcast[Seq[String]] = null
+
+  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          val wordBlacklist = Seq("a", "b", "c")
+          instance = sc.broadcast(wordBlacklist)
+        }
+      }
+    }
+    instance
+  }
+}
+
+/**
+ * Use this singleton to get or register an Accumulator.
+ */
+object DroppedWordsCounter {
+
+  @volatile private var instance: Accumulator[Long] = null
+
+  def getInstance(sc: SparkContext): Accumulator[Long] = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+        }
+      }
+    }
+    instance
+  }
+}
+
+/**
+ * Counts words in text encoded with UTF8 received from the network every second. This example also
+ * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
+ * they can be registered on driver failures.
  *
  * Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
  *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
@@ -75,10 +117,24 @@ object RecoverableNetworkWordCount {
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
     wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
-      val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
-      println(counts)
+      // Get or register the blacklist Broadcast
+      val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
+      // Get or register the droppedWordsCounter Accumulator
+      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
+      // Use blacklist to drop words and use droppedWordsCounter to count them
+      val counts = rdd.filter { case (word, count) =>
+        if (blacklist.value.contains(word)) {
+          droppedWordsCounter += count
+          false
+        } else {
+          true
+        }
+      }.collect().mkString("[", ", ", "]")
+      val output = "Counts at time " + time + " " + counts
+      println(output)
+      println("Dropped " + droppedWordsCounter.value + " word(s) totally")
       println("Appending to " + outputFile.getAbsolutePath)
-      Files.append(counts + "\n", outputFile, Charset.defaultCharset())
+      Files.append(output + "\n", outputFile, Charset.defaultCharset())
     })
     ssc
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org