You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2015/12/18 20:54:09 UTC

[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

GitHub user zsxwing opened a pull request:

    https://github.com/apache/spark/pull/10385

    [SPARK-12429][Streaming][Doc]Add Accumulator and Broadcast Scala example for Streaming

    This PR only contains Scala example right now.
    
    TODO
    - [ ] Java example
    - [ ] Python example

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zsxwing/spark accumulator-broadcast-example

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10385.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10385
    
----
commit 9928ca52457d8c951727461175ce37757cf1bb05
Author: Shixiong Zhu <sh...@databricks.com>
Date:   2015-12-18T19:50:47Z

    Add Accumulator and Broadcast Scala example for Streaming

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc][WIP]Add Accumula...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-165924666
  
    **[Test build #48038 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48038/consoleFull)** for PR 10385 at commit [`78d15bd`](https://github.com/apache/spark/commit/78d15bd7e343c4d6d07dcf97df76a6a8e6e8fe61).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166768053
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48222/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by BenFradet <gi...@git.apache.org>.
Github user BenFradet commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48078831
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -1415,6 +1415,95 @@ Note that the connections in the pool should be lazily created on demand and tim
     
     ***
     
    +## Accumulator and Broadcast
    +
    +Accumulator and Broadcast cannot be recovered from checkpoint in Streaming. If you enable checkpoint and use Accumulator or Broadcast as well, you have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
    --- End diff --
    
    I'd say: "in **Spark** Streaming. If you enable checkpoint**ing** and use an Accumulator or Broadcast as well, you**'ll** have to create ..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc][WIP]Add Accumula...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-165924754
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by BenFradet <gi...@git.apache.org>.
Github user BenFradet commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166427396
  
    lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48310590
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -1415,6 +1415,185 @@ Note that the connections in the pool should be lazily created on demand and tim
     
     ***
     
    +## Accumulator and Broadcast
    +
    +Accumulator and Broadcast cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulator or Broadcast as well, you'll have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +object WordBlacklist {
    +
    +  @volatile private var instance: Broadcast[Seq[String]] = null
    +
    +  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    +    if (instance == null) {
    +      synchronized {
    +        if (instance == null) {
    +          val wordBlacklist = Seq("a", "b", "c")
    +          instance = sc.broadcast(wordBlacklist)
    +        }
    +      }
    +    }
    +    instance
    +  }
    +}
    +
    +object DroppedWordsCounter {
    +
    +  @volatile private var instance: Accumulator[Long] = null
    +
    +  def getInstance(sc: SparkContext): Accumulator[Long] = {
    +    if (instance == null) {
    +      synchronized {
    +        if (instance == null) {
    +          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
    +        }
    +      }
    +    }
    +    instance
    +  }
    +}
    +
    +wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
    +  // Get or register the blacklist Broadcast
    +  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
    +  // Get or register the droppedWordsCounter Accumulator
    +  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
    +  // Use blacklist to drop words and use droppedWordsCounter to count them
    +  val counts = rdd.filter { case (word, count) =>
    +    if (blacklist.value.contains(word)) {
    +      droppedWordsCounter += 1
    +      false
    +    } else {
    +      true
    +    }
    +  }.collect().mkString("[", ", ", "]")
    +  val output = "Counts at time " + time + " " + counts
    +  println(output)
    +  println("Dropped " + droppedWordsCounter.value + " word(s) totally")
    +  println("Appending to " + outputFile.getAbsolutePath)
    +  Files.append(output + "\n", outputFile, Charset.defaultCharset())
    +})
    +
    +{% endhighlight %}
    +
    +See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +class JavaWordBlacklist {
    +
    +  private static volatile Broadcast<List<String>> instance = null;
    +
    +  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
    +    if (instance == null) {
    +      synchronized (WordBlacklist.class) {
    +        if (instance == null) {
    +          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
    +          instance = jsc.broadcast(wordBlacklist);
    +        }
    +      }
    +    }
    +    return instance;
    +  }
    +}
    +
    +class JavaDroppedWordsCounter {
    +
    +  private static volatile Accumulator<Integer> instance = null;
    +
    +  public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
    +    if (instance == null) {
    +      synchronized (DroppedWordsCounter.class) {
    +        if (instance == null) {
    +          instance = jsc.accumulator(0, "WordsInBlacklistCounter");
    +        }
    +      }
    +    }
    +    return instance;
    +  }
    +}
    +
    +wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
    +  @Override
    +  public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
    +    // Get or register the blacklist Broadcast
    +    final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
    +    // Get or register the droppedWordsCounter Accumulator
    +    final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
    +    // Use blacklist to drop words and use droppedWordsCounter to count them
    +    String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
    +      @Override
    +      public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
    +        if (blacklist.value().contains(wordCount._1())) {
    +          droppedWordsCounter.add(wordCount._2());
    +          return false;
    +        } else {
    +          return true;
    +        }
    +      }
    +    }).collect().toString();
    +    String output = "Counts at time " + time + " " + counts;
    +    System.out.println(output);
    +    System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
    --- End diff --
    
    remove unnecessary stuff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48310218
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -1415,6 +1415,185 @@ Note that the connections in the pool should be lazily created on demand and tim
     
     ***
     
    +## Accumulator and Broadcast
    +
    +Accumulator and Broadcast cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulator or Broadcast as well, you'll have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
    --- End diff --
    
    I believe its called "Broadcast variable"
    
    http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
    
    Might be a good idea to put links to the Spark programming guide 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166763071
  
    Small comments, other LGTM. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/10385


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48311290
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -1415,6 +1415,185 @@ Note that the connections in the pool should be lazily created on demand and tim
     
     ***
     
    +## Accumulator and Broadcast
    +
    +Accumulator and Broadcast cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulator or Broadcast as well, you'll have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +object WordBlacklist {
    +
    +  @volatile private var instance: Broadcast[Seq[String]] = null
    +
    +  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    +    if (instance == null) {
    +      synchronized {
    +        if (instance == null) {
    +          val wordBlacklist = Seq("a", "b", "c")
    +          instance = sc.broadcast(wordBlacklist)
    +        }
    +      }
    +    }
    +    instance
    +  }
    +}
    +
    +object DroppedWordsCounter {
    +
    +  @volatile private var instance: Accumulator[Long] = null
    --- End diff --
    
    Oh, no. We need `sc` here. So it cannot be a val.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48310169
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -1415,6 +1415,185 @@ Note that the connections in the pool should be lazily created on demand and tim
     
     ***
     
    +## Accumulator and Broadcast
    --- End diff --
    
    Accumulators and Broadcast variables


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166393943
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48120/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-165889117
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc][WIP]Add Accumula...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-165924756
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48038/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48310785
  
    --- Diff: examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java ---
    @@ -41,7 +46,48 @@
     import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
     
     /**
    - * Counts words in text encoded with UTF8 received from the network every second.
    + * Use this singleton to get or register `Broadcast`.
    + */
    +class JavaWordBlacklist {
    +
    +  private static volatile Broadcast<List<String>> instance = null;
    +
    +  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
    +    if (instance == null) {
    +      synchronized (WordBlacklist.class) {
    +        if (instance == null) {
    +          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
    +          instance = jsc.broadcast(wordBlacklist);
    +        }
    +      }
    +    }
    +    return instance;
    +  }
    +}
    +
    +/**
    + * Use this singleton to get or register `Accumulator`.
    --- End diff --
    
    Similar comment as above for Broadcast.. "an accumulator"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-165888992
  
    **[Test build #48018 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48018/consoleFull)** for PR 10385 at commit [`9928ca5`](https://github.com/apache/spark/commit/9928ca52457d8c951727461175ce37757cf1bb05).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `public class JavaTwitterHashTagJoinSentiments `\n  * `case class UnresolvedAlias(child: Expression, aliasName: Option[String] = None)`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-165885369
  
    **[Test build #48018 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48018/consoleFull)** for PR 10385 at commit [`9928ca5`](https://github.com/apache/spark/commit/9928ca52457d8c951727461175ce37757cf1bb05).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48310738
  
    --- Diff: examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java ---
    @@ -41,7 +46,48 @@
     import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
     
     /**
    - * Counts words in text encoded with UTF8 received from the network every second.
    + * Use this singleton to get or register `Broadcast`.
    --- End diff --
    
    "broadcast variable".
    why is Broadcast in single quotes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48314002
  
    --- Diff: docs/programming-guide.md ---
    @@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may
     
     What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.  
     
    -To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.  
    --- End diff --
    
    nice!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166420976
  
    Added Java and Python examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166768005
  
    **[Test build #48222 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48222/consoleFull)** for PR 10385 at commit [`455968a`](https://github.com/apache/spark/commit/455968a35d22978b9f6e85e34687615f40734708).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class JavaWordBlacklist `\n  * `class JavaDroppedWordsCounter `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-165889120
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48018/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166393941
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48310571
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -1415,6 +1415,185 @@ Note that the connections in the pool should be lazily created on demand and tim
     
     ***
     
    +## Accumulator and Broadcast
    +
    +Accumulator and Broadcast cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulator or Broadcast as well, you'll have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +object WordBlacklist {
    +
    +  @volatile private var instance: Broadcast[Seq[String]] = null
    +
    +  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    +    if (instance == null) {
    +      synchronized {
    +        if (instance == null) {
    +          val wordBlacklist = Seq("a", "b", "c")
    +          instance = sc.broadcast(wordBlacklist)
    +        }
    +      }
    +    }
    +    instance
    +  }
    +}
    +
    +object DroppedWordsCounter {
    +
    +  @volatile private var instance: Accumulator[Long] = null
    --- End diff --
    
    For scala, cant this whole thing be replaced with lazy val?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166768051
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48310806
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala ---
    @@ -23,13 +23,55 @@ import java.nio.charset.Charset
     
     import com.google.common.io.Files
     
    -import org.apache.spark.SparkConf
    +import org.apache.spark.{Accumulator, SparkConf, SparkContext}
    +import org.apache.spark.broadcast.Broadcast
     import org.apache.spark.rdd.RDD
     import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
     import org.apache.spark.util.IntParam
     
     /**
    - * Counts words in text encoded with UTF8 received from the network every second.
    + * Use this singleton to get or register `Broadcast`.
    --- End diff --
    
    Same comments as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48312104
  
    --- Diff: docs/programming-guide.md ---
    @@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may
     
     What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.  
     
    -To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.  
    --- End diff --
    
    I also fixed the broken `Accumulator` link in `programming-guide.md`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48310820
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala ---
    @@ -23,13 +23,55 @@ import java.nio.charset.Charset
     
     import com.google.common.io.Files
     
    -import org.apache.spark.SparkConf
    +import org.apache.spark.{Accumulator, SparkConf, SparkContext}
    +import org.apache.spark.broadcast.Broadcast
     import org.apache.spark.rdd.RDD
     import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
     import org.apache.spark.util.IntParam
     
     /**
    - * Counts words in text encoded with UTF8 received from the network every second.
    + * Use this singleton to get or register `Broadcast`.
    + */
    +object WordBlacklist {
    +
    +  @volatile private var instance: Broadcast[Seq[String]] = null
    +
    +  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    +    if (instance == null) {
    +      synchronized {
    +        if (instance == null) {
    +          val wordBlacklist = Seq("a", "b", "c")
    +          instance = sc.broadcast(wordBlacklist)
    +        }
    +      }
    +    }
    +    instance
    +  }
    +}
    +
    +/**
    + * Use this singleton to get or register `Accumulator`.
    + */
    +object DroppedWordsCounter {
    +
    +  @volatile private var instance: Accumulator[Long] = null
    --- End diff --
    
    same comment as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166388556
  
    **[Test build #48120 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48120/consoleFull)** for PR 10385 at commit [`9e241e7`](https://github.com/apache/spark/commit/9e241e791e5ce2cb0cc3dd7c609339ca8ea11129).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-165883963
  
    @tdas could you take a look before I start to add Java and Python examples?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48310515
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -1415,6 +1415,185 @@ Note that the connections in the pool should be lazily created on demand and tim
     
     ***
     
    +## Accumulator and Broadcast
    +
    +Accumulator and Broadcast cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulator or Broadcast as well, you'll have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +object WordBlacklist {
    +
    +  @volatile private var instance: Broadcast[Seq[String]] = null
    +
    +  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    +    if (instance == null) {
    +      synchronized {
    +        if (instance == null) {
    +          val wordBlacklist = Seq("a", "b", "c")
    +          instance = sc.broadcast(wordBlacklist)
    +        }
    +      }
    +    }
    +    instance
    +  }
    +}
    +
    +object DroppedWordsCounter {
    +
    +  @volatile private var instance: Accumulator[Long] = null
    +
    +  def getInstance(sc: SparkContext): Accumulator[Long] = {
    +    if (instance == null) {
    +      synchronized {
    +        if (instance == null) {
    +          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
    +        }
    +      }
    +    }
    +    instance
    +  }
    +}
    +
    +wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
    +  // Get or register the blacklist Broadcast
    +  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
    +  // Get or register the droppedWordsCounter Accumulator
    +  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
    +  // Use blacklist to drop words and use droppedWordsCounter to count them
    +  val counts = rdd.filter { case (word, count) =>
    +    if (blacklist.value.contains(word)) {
    +      droppedWordsCounter += 1
    +      false
    +    } else {
    +      true
    +    }
    +  }.collect().mkString("[", ", ", "]")
    +  val output = "Counts at time " + time + " " + counts
    +  println(output)
    --- End diff --
    
    No need to give so much code. remove all the unnecessary cosmetic stuff like "mkString" and "print"  and Files.append ....
    
    Same for all the other code.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166393679
  
    **[Test build #48120 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48120/consoleFull)** for PR 10385 at commit [`9e241e7`](https://github.com/apache/spark/commit/9e241e791e5ce2cb0cc3dd7c609339ca8ea11129).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class JavaWordBlacklist `\n  * `class JavaDroppedWordsCounter `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc][WIP]Add Accumula...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-165922051
  
    **[Test build #48038 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48038/consoleFull)** for PR 10385 at commit [`78d15bd`](https://github.com/apache/spark/commit/78d15bd7e343c4d6d07dcf97df76a6a8e6e8fe61).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10385#issuecomment-166766532
  
    **[Test build #48222 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48222/consoleFull)** for PR 10385 at commit [`455968a`](https://github.com/apache/spark/commit/455968a35d22978b9f6e85e34687615f40734708).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12429][Streaming][Doc]Add Accumulator a...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10385#discussion_r48310677
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -1415,6 +1415,185 @@ Note that the connections in the pool should be lazily created on demand and tim
     
     ***
     
    +## Accumulator and Broadcast
    +
    +Accumulator and Broadcast cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulator or Broadcast as well, you'll have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +object WordBlacklist {
    +
    +  @volatile private var instance: Broadcast[Seq[String]] = null
    +
    +  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    +    if (instance == null) {
    +      synchronized {
    +        if (instance == null) {
    +          val wordBlacklist = Seq("a", "b", "c")
    +          instance = sc.broadcast(wordBlacklist)
    +        }
    +      }
    +    }
    +    instance
    +  }
    +}
    +
    +object DroppedWordsCounter {
    +
    +  @volatile private var instance: Accumulator[Long] = null
    --- End diff --
    
    Yes. Good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org