You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by huaxingao <gi...@git.apache.org> on 2016/02/18 09:57:52 UTC

[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

GitHub user huaxingao opened a pull request:

    https://github.com/apache/spark/pull/11250

    [SPARK-13186][Streaming]migrate away from SynchronizedMap

    trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/huaxingao/spark spark__13186

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11250.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11250
    
----
commit ff5e52ae100408e81de7f138b876ec8a47aaafa1
Author: Huaxin Gao <hu...@us.ibm.com>
Date:   2016-02-18T04:25:41Z

    [SPARK-13186][Streaming]migrate away from SynchronizedMap

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11250#discussion_r53565164
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -613,7 +613,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
         def recordedFiles(ssc: StreamingContext): Seq[Int] = {
           val fileInputDStream =
             ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized
    --- End diff --
    
    I'm OK with this, but it would be cleaner to make `fileInputDStream.batchTimeToSelectedFiles` a val and reuse it, and keep the open brace with `synchronized`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on the pull request:

    https://github.com/apache/spark/pull/11250#issuecomment-185608162
  
    @srowen @holdenk 
    
    Could you please take a look of this PR? I ran the python streaming test cleanly on my local before I submitted the PR. 
    
    Thanks a lot!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11250#discussion_r53547645
  
    --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---
    @@ -268,9 +270,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
     
         // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
         // and return the same data
    -    val times = collectedData.keySet
    +    val times = collectedData.synchronized { collectedData.keySet }
    --- End diff --
    
    I think you'd either have to make a copy of the key set being returned, or synchronize the entire `foreach` block below. The set is (I believe) backed by the collection and can be modified during iteratio.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11250#issuecomment-186803513
  
    **[Test build #51627 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51627/consoleFull)** for PR 11250 at commit [`3de8e40`](https://github.com/apache/spark/commit/3de8e401484be2178af90ca176bc1b4ffdf90ef8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11250#discussion_r53563513
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -163,8 +166,11 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
       /** Clear the old time-to-files mappings along with old RDDs */
       protected[streaming] override def clearMetadata(time: Time) {
         super.clearMetadata(time)
    -    val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
    -    batchTimeToSelectedFiles --= oldFiles.keys
    +    val oldFiles = batchTimeToSelectedFiles.synchronized
    +    {
    +      batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
    --- End diff --
    
    @srowen 
    Thank you very much for your comments.  I changed the code to synchronize the whole block. 
     The extra parentheses at line 153 seems to be required.  Removing it causes problem. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11250#issuecomment-186817034
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51627/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11250#discussion_r53547651
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -148,7 +148,10 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         // Find new files
         val newFiles = findNewFiles(validTime.milliseconds)
         logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
    -    batchTimeToSelectedFiles += ((validTime, newFiles))
    +    batchTimeToSelectedFiles.synchronized
    +    {
    --- End diff --
    
    Nit: put this brace on the previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/11250


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11250#discussion_r53296925
  
    --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---
    @@ -241,13 +243,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
         kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
           val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
           val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
    -      collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
    +      collectedData.put(time, (kRdd.arrayOfseqNumberRanges, data))
         })
     
         ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
         ssc.start()
     
    -    def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
    +    def numBatchesWithData: Int = collectedData.asScala.count(_._2._2.nonEmpty)
    --- End diff --
    
    I think that we have a problem in lines like this, still. I think this is what @holdenk was alluding to. This returns a wrapper on the collection, and then iterates over it to count non-empty elements. But it may be modified by the `put` above while that happens, throwing `ConcurrentModificationException`. We'd have to clone it, or synchronize on the whole object while counting (the latter is probably better).
    
    In that case, it may not add any value to use Java's `ConcurrentHashMap`. Synchronizing access to `mutable.HashMap` is the same and doesn't require using a Java type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11250#issuecomment-185607707
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11250#discussion_r53547665
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -163,8 +166,11 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
       /** Clear the old time-to-files mappings along with old RDDs */
       protected[streaming] override def clearMetadata(time: Time) {
         super.clearMetadata(time)
    -    val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
    -    batchTimeToSelectedFiles --= oldFiles.keys
    +    val oldFiles = batchTimeToSelectedFiles.synchronized
    +    {
    +      batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
    --- End diff --
    
    Same, I don't think it's safe to use the filtered collection outside the synchronized block. Here, maybe copying the collection result isn't a bad idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11250#issuecomment-186817000
  
    **[Test build #51627 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51627/consoleFull)** for PR 11250 at commit [`3de8e40`](https://github.com/apache/spark/commit/3de8e401484be2178af90ca176bc1b4ffdf90ef8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11250#discussion_r53430913
  
    --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---
    @@ -241,13 +243,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
         kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
           val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
           val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
    -      collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
    +      collectedData.put(time, (kRdd.arrayOfseqNumberRanges, data))
         })
     
         ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
         ssc.start()
     
    -    def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
    +    def numBatchesWithData: Int = collectedData.asScala.count(_._2._2.nonEmpty)
    --- End diff --
    
    @srowen 
    Thanks for your comment. For the 5 files I changed, I will remove the usage of Java ConcurrentHashMap, and use mutable.HashMap instead.  I will  wrap every mutable.HashMap operation in a synchronized block. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11250#discussion_r53473423
  
    --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---
    @@ -241,13 +243,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
         kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
           val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
           val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
    -      collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
    +      collectedData.put(time, (kRdd.arrayOfseqNumberRanges, data))
         })
     
         ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
         ssc.start()
     
    -    def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
    +    def numBatchesWithData: Int = collectedData.asScala.count(_._2._2.nonEmpty)
    --- End diff --
    
    That could work, we can also just use things like `collectedData.values().asScala.count(_._2.nonEmpty)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11250#issuecomment-186817033
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/11250#issuecomment-186800650
  
    Jenkins test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/11250#issuecomment-187097084
  
    Merged to master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11250#discussion_r53547655
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -148,7 +148,10 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         // Find new files
         val newFiles = findNewFiles(validTime.milliseconds)
         logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
    -    batchTimeToSelectedFiles += ((validTime, newFiles))
    +    batchTimeToSelectedFiles.synchronized
    +    {
    +      batchTimeToSelectedFiles += ((validTime, newFiles))
    --- End diff --
    
    I think the extra parentheses around the argument aren't needed? I know they were there before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by huaxingao <gi...@git.apache.org>.
Github user huaxingao commented on the pull request:

    https://github.com/apache/spark/pull/11250#issuecomment-187305582
  
    @srowen @holdenk 
    Thank you very much for your help!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13186][Streaming]migrate away from Sync...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11250#discussion_r53565173
  
    --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---
    @@ -268,21 +270,23 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
     
         // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
         // and return the same data
    -    val times = collectedData.keySet
    -    times.foreach { time =>
    -      val (arrayOfSeqNumRanges, data) = collectedData(time)
    -      val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
    -      rdd shouldBe a [KinesisBackedBlockRDD[_]]
    -
    -      // Verify the recovered sequence ranges
    -      val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
    -      assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
    -      arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
    -        assert(expected.ranges.toSeq === found.ranges.toSeq)
    +    collectedData.synchronized {
    +      val times = collectedData.keySet
    +      times.foreach { time =>
    --- End diff --
    
    Also not necessary that you change this, but it looks like these 3 lines around here could have been
    
    ```
    collectedData.foreach { case (time, (arrayOfSeqNumRanges, data)) =>
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org