You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/03/14 10:13:16 UTC

spark git commit: [SPARK-13746][TESTS] stop using deprecated SynchronizedSet

Repository: spark
Updated Branches:
  refs/heads/master acdf21970 -> 31d069d4c


[SPARK-13746][TESTS] stop using deprecated SynchronizedSet

trait SynchronizedSet in package mutable is deprecated

Author: Wilson Wu <wi...@gmail.com>

Closes #11580 from wilson888888888/spark-synchronizedset.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31d069d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31d069d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31d069d4

Branch: refs/heads/master
Commit: 31d069d4c2956306355d14087ca74ce1e6705217
Parents: acdf219
Author: Wilson Wu <wi...@gmail.com>
Authored: Mon Mar 14 09:13:29 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Mar 14 09:13:29 2016 +0000

----------------------------------------------------------------------
 .../org/apache/spark/ContextCleanerSuite.scala  | 41 ++++++++++++--------
 .../streaming/kinesis/KinesisStreamSuite.scala  | 22 +++++++----
 2 files changed, 39 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/31d069d4/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index d1e806b..e60678b 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import java.lang.ref.WeakReference
 
-import scala.collection.mutable.{HashSet, SynchronizedSet}
+import scala.collection.mutable.HashSet
 import scala.language.existentials
 import scala.util.Random
 
@@ -442,25 +442,25 @@ class CleanerTester(
     checkpointIds: Seq[Long] = Seq.empty)
   extends Logging {
 
-  val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
-  val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
-  val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
-  val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
+  val toBeCleanedRDDIds = new HashSet[Int] ++= rddIds
+  val toBeCleanedShuffleIds = new HashSet[Int] ++= shuffleIds
+  val toBeCleanedBroadcstIds = new HashSet[Long] ++= broadcastIds
+  val toBeCheckpointIds = new HashSet[Long] ++= checkpointIds
   val isDistributed = !sc.isLocal
 
   val cleanerListener = new CleanerListener {
     def rddCleaned(rddId: Int): Unit = {
-      toBeCleanedRDDIds -= rddId
+      toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds -= rddId }
       logInfo("RDD " + rddId + " cleaned")
     }
 
     def shuffleCleaned(shuffleId: Int): Unit = {
-      toBeCleanedShuffleIds -= shuffleId
+      toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds -= shuffleId }
       logInfo("Shuffle " + shuffleId + " cleaned")
     }
 
     def broadcastCleaned(broadcastId: Long): Unit = {
-      toBeCleanedBroadcstIds -= broadcastId
+      toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds -= broadcastId }
       logInfo("Broadcast " + broadcastId + " cleaned")
     }
 
@@ -469,7 +469,7 @@ class CleanerTester(
     }
 
     def checkpointCleaned(rddId: Long): Unit = {
-      toBeCheckpointIds -= rddId
+      toBeCheckpointIds.synchronized { toBeCheckpointIds -= rddId }
       logInfo("checkpoint  " + rddId + " cleaned")
     }
   }
@@ -578,18 +578,27 @@ class CleanerTester(
   }
 
   private def uncleanedResourcesToString = {
+    val s1 = toBeCleanedRDDIds.synchronized {
+      toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")
+    }
+    val s2 = toBeCleanedShuffleIds.synchronized {
+      toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")
+    }
+    val s3 = toBeCleanedBroadcstIds.synchronized {
+      toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")
+    }
     s"""
-      |\tRDDs = ${toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")}
-      |\tShuffles = ${toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")}
-      |\tBroadcasts = ${toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")}
+       |\tRDDs = $s1
+       |\tShuffles = $s2
+       |\tBroadcasts = $s3
     """.stripMargin
   }
 
   private def isAllCleanedUp =
-    toBeCleanedRDDIds.isEmpty &&
-    toBeCleanedShuffleIds.isEmpty &&
-    toBeCleanedBroadcstIds.isEmpty &&
-    toBeCheckpointIds.isEmpty
+    toBeCleanedRDDIds.synchronized { toBeCleanedRDDIds.isEmpty } &&
+    toBeCleanedShuffleIds.synchronized { toBeCleanedShuffleIds.isEmpty } &&
+    toBeCleanedBroadcstIds.synchronized { toBeCleanedBroadcstIds.isEmpty } &&
+    toBeCheckpointIds.synchronized { toBeCheckpointIds.isEmpty }
 
   private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
     blockManager.master.getMatchingBlockIds( _ match {

http://git-wip-us.apache.org/repos/asf/spark/blob/31d069d4/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ca5d13d..4460b6b 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -180,17 +180,20 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
       Seconds(10), StorageLevel.MEMORY_ONLY,
       awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
 
-    val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+    val collected = new mutable.HashSet[Int]
     stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
-      collected ++= rdd.collect()
-      logInfo("Collected = " + collected.mkString(", "))
+      collected.synchronized {
+        collected ++= rdd.collect()
+        logInfo("Collected = " + collected.mkString(", "))
+      }
     }
     ssc.start()
 
     val testData = 1 to 10
     eventually(timeout(120 seconds), interval(10 second)) {
       testUtils.pushData(testData, aggregateTestData)
-      assert(collected === testData.toSet, "\nData received does not match data sent")
+      assert(collected.synchronized { collected === testData.toSet },
+        "\nData received does not match data sent")
     }
     ssc.stop(stopSparkContext = false)
   }
@@ -205,10 +208,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
 
     stream shouldBe a [ReceiverInputDStream[_]]
 
-    val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+    val collected = new mutable.HashSet[Int]
     stream.foreachRDD { rdd =>
-      collected ++= rdd.collect()
-      logInfo("Collected = " + collected.mkString(", "))
+      collected.synchronized {
+        collected ++= rdd.collect()
+        logInfo("Collected = " + collected.mkString(", "))
+      }
     }
     ssc.start()
 
@@ -216,7 +221,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
     eventually(timeout(120 seconds), interval(10 second)) {
       testUtils.pushData(testData, aggregateTestData)
       val modData = testData.map(_ + 5)
-      assert(collected === modData.toSet, "\nData received does not match data sent")
+      assert(collected.synchronized { collected === modData.toSet },
+        "\nData received does not match data sent")
     }
     ssc.stop(stopSparkContext = false)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org