You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/02/22 10:44:36 UTC

spark git commit: [SPARK-13186][STREAMING] migrate away from SynchronizedMap

Repository: spark
Updated Branches:
  refs/heads/master 39ff15457 -> 8f35d3eac


[SPARK-13186][STREAMING] migrate away from SynchronizedMap

trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead.

Author: Huaxin Gao <hu...@us.ibm.com>

Closes #11250 from huaxingao/spark__13186.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f35d3ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f35d3ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f35d3ea

Branch: refs/heads/master
Commit: 8f35d3eac9268127512851e52864e64b0bae2f33
Parents: 39ff154
Author: Huaxin Gao <hu...@us.ibm.com>
Authored: Mon Feb 22 09:44:32 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Feb 22 09:44:32 2016 +0000

----------------------------------------------------------------------
 .../streaming/kafka/KafkaStreamSuite.scala      | 13 +++----
 .../streaming/kinesis/KinesisStreamSuite.scala  | 38 +++++++++++---------
 .../streaming/dstream/FileInputDStream.scala    | 30 ++++++++--------
 .../spark/streaming/CheckpointSuite.scala       |  3 +-
 .../streaming/StreamingListenerSuite.scala      | 12 +++++--
 5 files changed, 55 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8f35d3ea/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 797b07f..6a35ac1 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -65,19 +65,20 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
 
     val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
       ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
-    val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
+    val result = new mutable.HashMap[String, Long]()
     stream.map(_._2).countByValue().foreachRDD { r =>
-      val ret = r.collect()
-      ret.toMap.foreach { kv =>
-        val count = result.getOrElseUpdate(kv._1, 0) + kv._2
-        result.put(kv._1, count)
+      r.collect().foreach { kv =>
+        result.synchronized {
+          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
+          result.put(kv._1, count)
+        }
       }
     }
 
     ssc.start()
 
     eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
-      assert(sent === result)
+      assert(result.synchronized { sent === result })
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f35d3ea/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ee6a5f0..ca5d13d 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -230,7 +230,6 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
 
     val awsCredentials = KinesisTestUtils.getAWSCredentials()
     val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
-      with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
 
     val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
       testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
@@ -241,13 +240,16 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
     kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
       val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
       val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
-      collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+      collectedData.synchronized {
+        collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+      }
     })
 
     ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
     ssc.start()
 
-    def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
+    def numBatchesWithData: Int =
+      collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) }
 
     def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
 
@@ -268,21 +270,23 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
 
     // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
     // and return the same data
-    val times = collectedData.keySet
-    times.foreach { time =>
-      val (arrayOfSeqNumRanges, data) = collectedData(time)
-      val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
-      rdd shouldBe a [KinesisBackedBlockRDD[_]]
-
-      // Verify the recovered sequence ranges
-      val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
-      assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
-      arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
-        assert(expected.ranges.toSeq === found.ranges.toSeq)
+    collectedData.synchronized {
+      val times = collectedData.keySet
+      times.foreach { time =>
+        val (arrayOfSeqNumRanges, data) = collectedData(time)
+        val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
+        rdd shouldBe a[KinesisBackedBlockRDD[_]]
+
+        // Verify the recovered sequence ranges
+        val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
+        assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
+        arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
+          assert(expected.ranges.toSeq === found.ranges.toSeq)
+        }
+
+        // Verify the recovered data
+        assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
       }
-
-      // Verify the recovered data
-      assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
     }
     ssc.stop()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f35d3ea/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 1c23254..a25dada 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -117,7 +117,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
   // Map of batch-time to selected file info for the remembered batches
   // This is a concurrent map because it's also accessed in unit tests
   @transient private[streaming] var batchTimeToSelectedFiles =
-    new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
+    new mutable.HashMap[Time, Array[String]]
 
   // Set of files that were selected in the remembered batches
   @transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -148,7 +148,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
     // Find new files
     val newFiles = findNewFiles(validTime.milliseconds)
     logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
-    batchTimeToSelectedFiles += ((validTime, newFiles))
+    batchTimeToSelectedFiles.synchronized {
+      batchTimeToSelectedFiles += ((validTime, newFiles))
+    }
     recentlySelectedFiles ++= newFiles
     val rdds = Some(filesToRDD(newFiles))
     // Copy newFiles to immutable.List to prevent from being modified by the user
@@ -162,14 +164,15 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
 
   /** Clear the old time-to-files mappings along with old RDDs */
   protected[streaming] override def clearMetadata(time: Time) {
-    super.clearMetadata(time)
-    val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
-    batchTimeToSelectedFiles --= oldFiles.keys
-    recentlySelectedFiles --= oldFiles.values.flatten
-    logInfo("Cleared " + oldFiles.size + " old files that were older than " +
-      (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
-    logDebug("Cleared files are:\n" +
-      oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
+    batchTimeToSelectedFiles.synchronized {
+      val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
+      batchTimeToSelectedFiles --= oldFiles.keys
+      recentlySelectedFiles --= oldFiles.values.flatten
+      logInfo("Cleared " + oldFiles.size + " old files that were older than " +
+        (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
+      logDebug("Cleared files are:\n" +
+        oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
+    }
     // Delete file mod times that weren't accessed in the last round of getting new files
     fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
   }
@@ -307,8 +310,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
     generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
-    batchTimeToSelectedFiles =
-      new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
+    batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
     recentlySelectedFiles = new mutable.HashSet[String]()
     fileToModTime = new TimeStampedHashMap[String, Long](true)
   }
@@ -324,7 +326,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
 
     override def update(time: Time) {
       hadoopFiles.clear()
-      hadoopFiles ++= batchTimeToSelectedFiles
+      batchTimeToSelectedFiles.synchronized { hadoopFiles ++= batchTimeToSelectedFiles }
     }
 
     override def cleanup(time: Time) { }
@@ -335,7 +337,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
           // Restore the metadata in both files and generatedRDDs
           logInfo("Restoring files for time " + t + " - " +
             f.mkString("[", ", ", "]") )
-          batchTimeToSelectedFiles += ((t, f))
+          batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((t, f)) }
           recentlySelectedFiles ++= f
           generatedRDDs += ((t, filesToRDD(f)))
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f35d3ea/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 1f0245a..dada495 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -613,7 +613,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
     def recordedFiles(ssc: StreamingContext): Seq[Int] = {
       val fileInputDStream =
         ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
+      val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized
+         { fileInputDStream.batchTimeToSelectedFiles.values.flatten }
       filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8f35d3ea/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 66f4739..6c60652 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -270,7 +270,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
       }
     }
     _ssc.stop()
-    failureReasonsCollector.failureReasons.toMap
+    failureReasonsCollector.failureReasons.synchronized
+    {
+      failureReasonsCollector.failureReasons.toMap
+    }
   }
 
   /** Check if a sequence of numbers is in increasing order */
@@ -354,12 +357,15 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
  */
 class FailureReasonsCollector extends StreamingListener {
 
-  val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String]
+  val failureReasons = new HashMap[Int, String]
 
   override def onOutputOperationCompleted(
       outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
     outputOperationCompleted.outputOperationInfo.failureReason.foreach { f =>
-      failureReasons(outputOperationCompleted.outputOperationInfo.id) = f
+      failureReasons.synchronized
+      {
+        failureReasons(outputOperationCompleted.outputOperationInfo.id) = f
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org