You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by huaxingao <gi...@git.apache.org> on 2016/02/07 00:11:08 UTC

[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

GitHub user huaxingao opened a pull request:

    https://github.com/apache/spark/pull/11104

    [SPARK-13186][Streaming]Migrate away from SynchronizedMap 

    trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to  java.util.concurrent.ConcurrentHashMap instead. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/huaxingao/spark spark_13186

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11104.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11104
    
----
commit 24e75ae7070f82ec847c144a5ba4940736d95503
Author: Huaxin Gao <hu...@us.ibm.com>
Date:   2016-02-02T07:13:50Z

    [SPARK-13186][Streaming]Migrate away from SynchronizedMap

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181656097
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50949/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-182198583
  
    **[Test build #51018 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51018/consoleFull)** for PR 11104 at commit [`cde889e`](https://github.com/apache/spark/commit/cde889e6bea3ee30c4b6108471048a6bec3624e2).
     * This patch **fails PySpark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181666297
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181655868
  
    **[Test build #50949 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50949/consoleFull)** for PR 11104 at commit [`a56f280`](https://github.com/apache/spark/commit/a56f280fdc7367c2cfdd2a6085320a4112b8a461).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181666298
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50950/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52250939
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -65,12 +67,14 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     
         val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
           ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
    -    val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
    +    val result = new ConcurrentHashMap[String, Long].asScala
         stream.map(_._2).countByValue().foreachRDD { r =>
           val ret = r.collect()
           ret.toMap.foreach { kv =>
    -        val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    -        result.put(kv._1, count)
    +        result.synchronized {
    +          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    --- End diff --
    
    @holdenk @zsxwing 
    I tried  _val count = result.putIfAbsent(kv.1, 0) + kv._2,_ but the test failed for me.  So I will change to mutable.HashMap and put in synchronized block. 
    Is it OK to use  mutable.HashMap and synchronized block in this file only, but use java.util.concurrent.ConcurrentHashMap in other files(StreamingListenerSuite, KinesisStreamTests and FileInputDStream)? Or is it better to  to use  mutable.HashMap and synchronized block for all the files that has SynchronizedMap?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52247757
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -65,12 +67,14 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     
         val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
           ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
    -    val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
    +    val result = new ConcurrentHashMap[String, Long].asScala
         stream.map(_._2).countByValue().foreachRDD { r =>
           val ret = r.collect()
           ret.toMap.foreach { kv =>
    -        val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    -        result.put(kv._1, count)
    +        result.synchronized {
    +          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    --- End diff --
    
    +1 for using synchronized + mutable.HashMap. In addition, `toMap` in `ret.toMap.foreach` can be removed. Hence I would recommend changing codes to
    
    ```
        val result = new mutable.HashMap[String, Long]()
        stream.map(_._2).countByValue().foreachRDD { r =>
          r.collect().foreach { kv =>
            result.synchronized {
              val count = result.getOrElseUpdate(kv._1, 0) + kv._2
              result.put(kv._1, count)
            }
          }
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-180889389
  
    Thanks for the PR and getting started on this :)
    
    So the first minor thing we can update easily is import ordering in many of the files should follow the style guide https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports 
    
    The next is the asScala conversion thats used in many places is removing the concurrency gaurantees:
    
    >   If the wrapped map is synchronized (e.g. from `java.util.Collections.synchronizedMap`),
    >   it is your responsibility to wrap all 
    >   non-atomic operations with `underlying.synchronized`.
    >   This includes `get`, as `java.util.Map`'s API does not allow for an
    >   atomic `get` when `null` values may be present.
    
    I think rather than wrapping in underlying.synchronized it might be easier to have the operation on the Java API as we did in https://github.com/apache/spark/pull/11059 (although its a bit more painful to code this way).
    
    We should as coordinate with @ted-yu who I believe is working on corresponding scala style rules to prevent people from using the unsafe scala version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52252046
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -65,12 +67,14 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     
         val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
           ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
    -    val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
    +    val result = new ConcurrentHashMap[String, Long].asScala
         stream.map(_._2).countByValue().foreachRDD { r =>
           val ret = r.collect()
           ret.toMap.foreach { kv =>
    -        val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    -        result.put(kv._1, count)
    +        result.synchronized {
    +          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    --- End diff --
    
    Using ConcurrentHashMap in other files looks fine. I don't see any potential issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181656096
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-185171737
  
    @huaxingao OK, do you want to close this and try another PR? I can try to take it on too.
    I think this is mostly correct except we'll need more synchronization in places where the wrapped map is iterated over


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52236511
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -65,12 +67,14 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     
         val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
           ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
    -    val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
    +    val result = new ConcurrentHashMap[String, Long].asScala
         stream.map(_._2).countByValue().foreachRDD { r =>
           val ret = r.collect()
           ret.toMap.foreach { kv =>
    -        val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    -        result.put(kv._1, count)
    +        result.synchronized {
    +          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    --- End diff --
    
    So you would probably want to try `val count = result.putIfAbsent(kv.1, 0) + kv._2` - although looking at the original code it had a race condition. If were going to put a synchronized block around the update we could just use a regular `mutable.HashMap`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181528520
  
    @holdenk 
    Could you please review one more time? 
    I changed to java api except the getOrElseUpdate in KafkaStreamSuite.scala. I can't find a java equivalent that can be done in one line. So I used the synchronized block. 
    Thank you very much for your help!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-180889472
  
    @huaxingao did a first quick pass :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52249862
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -18,7 +18,9 @@
     package org.apache.spark.streaming.dstream
     
     import java.io.{IOException, ObjectInputStream}
    +import java.util.concurrent.ConcurrentHashMap
     
    +import scala.collection.convert.decorateAsScala._
    --- End diff --
    
    Please use import scala.collection.JavaConverters._


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52247909
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -65,12 +67,14 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     
         val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
           ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
    -    val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
    +    val result = new ConcurrentHashMap[String, Long].asScala
         stream.map(_._2).countByValue().foreachRDD { r =>
           val ret = r.collect()
           ret.toMap.foreach { kv =>
    -        val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    -        result.put(kv._1, count)
    +        result.synchronized {
    +          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    --- End diff --
    
    And also change `assert` to `assert(sent === result.synchronized { result })`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52109403
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -30,6 +30,9 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     
    +import java.util.concurrent.ConcurrentHashMap
    +import scala.collection.convert.decorateAsScala._
    --- End diff --
    
    I think this needs improved import ordering https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52249784
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala ---
    @@ -17,7 +17,10 @@
     
     package org.apache.spark.streaming
     
    -import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap}
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.convert.decorateAsScala._
    --- End diff --
    
    Please use `import scala.collection.JavaConverters._`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52228031
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -65,12 +67,14 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     
         val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
           ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
    -    val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
    +    val result = new ConcurrentHashMap[String, Long].asScala
         stream.map(_._2).countByValue().foreachRDD { r =>
           val ret = r.collect()
           ret.toMap.foreach { kv =>
    -        val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    -        result.put(kv._1, count)
    +        result.synchronized {
    +          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    --- End diff --
    
    @holdenk 
    Thanks for your quick reply. 
    I initially changed 
       val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    to
        result.putIfAbsent(kv._1, 0)
        val count = result.get(kv._1) + kv._2_
    but the test failed for me. I guess a different thread can come in between of the two lines and the concurrency is not guaranteed any more.  So I used synchronized block instead. 
             


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-182199328
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51018/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181666295
  
    **[Test build #50950 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50950/consoleFull)** for PR 11104 at commit [`5668a79`](https://github.com/apache/spark/commit/5668a79bcb72c2de34a1d2aa7d95984e7d8bf0d2).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181638140
  
    LGTM except some nits. Thanks, @huaxingao


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181632123
  
    By the way, is there any PR removing `SynchronizedSet`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-180882891
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181537776
  
    Sure I'll take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao closed the pull request at:

    https://github.com/apache/spark/pull/11104


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-183434809
  
    @srowen 
    Will do.  I have my local branch messed up.  If i can't figure out how to fix it, I will close this PR and submit a new one. Also, one of the python streaming test failed with java.net.BindException.  I am still trying to figure out the problem. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52118020
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -30,6 +30,9 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     
    +import java.util.concurrent.ConcurrentHashMap
    +import scala.collection.convert.decorateAsScala._
    --- End diff --
    
    What is `decorateAsScala` needed for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-180883943
  
    @holdenk 
    Could you please review?  Thanks!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52109483
  
    --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---
    @@ -229,8 +231,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
         ssc.checkpoint(checkpointDir)
     
         val awsCredentials = KinesisTestUtils.getAWSCredentials()
    -    val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
    -      with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
    +    val collectedData = new ConcurrentHashMap[Time, (Array[SequenceNumberRanges], Seq[Int])].asScala
    --- End diff --
    
    This conversion doesn't result in a thread safe hashmap sadly. See the comment in the PR for more details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181650122
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52249791
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -19,6 +19,7 @@ package org.apache.spark.streaming
     
     import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream}
     
    +import scala.collection.convert.decorateAsScala._
    --- End diff --
    
    Please use `import scala.collection.JavaConverters._`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181666127
  
    **[Test build #50950 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50950/consoleFull)** for PR 11104 at commit [`5668a79`](https://github.com/apache/spark/commit/5668a79bcb72c2de34a1d2aa7d95984e7d8bf0d2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-182199323
  
    Build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52126981
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -30,6 +30,9 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     
    +import java.util.concurrent.ConcurrentHashMap
    +import scala.collection.convert.decorateAsScala._
    --- End diff --
    
    I haven't dug into the code enough to say there is any place where it would be safe to note have concurrency guarantees, its probably easier to just use the safe methods: for ++ you can use addAll and there is remove as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181648992
  
    Fixed the problems.  Thank you all very much for your help!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181633831
  
    @zsxwing 
    Thanks for the comments. I didn't see a PR for removing SynchronizedSet.  I will work on this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181656095
  
    **[Test build #50949 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50949/consoleFull)** for PR 11104 at commit [`a56f280`](https://github.com/apache/spark/commit/a56f280fdc7367c2cfdd2a6085320a4112b8a461).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181541571
  
    So I think the `putIfAbsent` API might do what your looking for there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-182169291
  
    **[Test build #51018 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51018/consoleFull)** for PR 11104 at commit [`cde889e`](https://github.com/apache/spark/commit/cde889e6bea3ee30c4b6108471048a6bec3624e2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52249982
  
    --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---
    @@ -17,6 +17,9 @@
     
     package org.apache.spark.streaming.kinesis
     
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.convert.decorateAsScala._
    --- End diff --
    
    Please use import scala.collection.JavaConverters._


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52126950
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -30,6 +30,9 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     
    +import java.util.concurrent.ConcurrentHashMap
    +import scala.collection.convert.decorateAsScala._
    --- End diff --
    
    @holdenk Thanks for your comments.  Yes, that's why I have decorateAsScala there.  
    
    I am working on changing the code to use Java API for +=, put and getOrElseUpdate.  Do we also need concurrency guarantee for ++ and --? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-185604993
  
    @srowen @holdenk 
    I will close this PR and submit a new one. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181664339
  
    Sorry for the file line length problem.  Fixed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52126187
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -30,6 +30,9 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     
    +import java.util.concurrent.ConcurrentHashMap
    +import scala.collection.convert.decorateAsScala._
    --- End diff --
    
    of course @huaxingao feel free to correct my understanding if I'm off base :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-181767095
  
    @huaxingao one more style issue and it also needs to be updated with the latest master since there are now merge conflicts now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/11104#issuecomment-183267563
  
    @huaxingao can you rebase this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52126175
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -30,6 +30,9 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.{Milliseconds, StreamingContext}
     
    +import java.util.concurrent.ConcurrentHashMap
    +import scala.collection.convert.decorateAsScala._
    --- End diff --
    
    So based on my reading, I think decorateAsScala is being used in place of the standard Java collection conversions to allow the updates to the underlying Java backing type - however this breaks the concurrency guarantees so its doesn't really buy us anything. (but if we were in a situation where concurrency didn't matter and we just wanted to interact with some Java types it could be nice).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]Migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11104#discussion_r52215569
  
    --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---
    @@ -65,12 +67,14 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     
         val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
           ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
    -    val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
    +    val result = new ConcurrentHashMap[String, Long].asScala
         stream.map(_._2).countByValue().foreachRDD { r =>
           val ret = r.collect()
           ret.toMap.foreach { kv =>
    -        val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    -        result.put(kv._1, count)
    +        result.synchronized {
    +          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
    --- End diff --
    
    I think `putIfAbsent` on the underlying Java type might do what you are looking for here. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentHashMap.html#putIfAbsent(K,%20V) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org