You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/06/04 00:04:29 UTC
spark git commit: [SPARK-7989] [CORE] [TESTS] Fix flaky tests in
ExternalShuffleServiceSuite and SparkListenerWithClusterSuite
Repository: spark
Updated Branches:
refs/heads/master 1d8669f15 -> f27134782
[SPARK-7989] [CORE] [TESTS] Fix flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite
The flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite will fail if there are not enough executors up before running the jobs.
This PR adds `JobProgressListener.waitUntilExecutorsUp`. The tests for the cluster mode can use it to wait until the expected executors are up.
Author: zsxwing <zs...@gmail.com>
Closes #6546 from zsxwing/SPARK-7989 and squashes the following commits:
5560e09 [zsxwing] Fix a typo
3b69840 [zsxwing] Fix flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2713478
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2713478
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2713478
Branch: refs/heads/master
Commit: f27134782ebb61c360330e2d6d5bb1aa02be3fb6
Parents: 1d8669f
Author: zsxwing <zs...@gmail.com>
Authored: Wed Jun 3 15:04:20 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Jun 3 15:04:20 2015 -0700
----------------------------------------------------------------------
.../spark/ui/jobs/JobProgressListener.scala | 30 ++++++++++++++++++++
.../spark/ExternalShuffleServiceSuite.scala | 8 ++++++
.../apache/spark/broadcast/BroadcastSuite.scala | 10 +------
.../SparkListenerWithClusterSuite.scala | 10 +++++--
4 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f2713478/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index f39e961..1d31fce 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -17,8 +17,12 @@
package org.apache.spark.ui.jobs
+import java.util.concurrent.TimeoutException
+
import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
+import com.google.common.annotations.VisibleForTesting
+
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
@@ -526,4 +530,30 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onApplicationStart(appStarted: SparkListenerApplicationStart) {
startTime = appStarted.time
}
+
+ /**
+ * For testing only. Wait until at least `numExecutors` executors are up, or throw
+ * `TimeoutException` if the waiting time elapsed before `numExecutors` executors up.
+ *
+ * @param numExecutors the number of executors to wait at least
+ * @param timeout time to wait in milliseconds
+ */
+ @VisibleForTesting
+ private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = {
+ val finishTime = System.currentTimeMillis() + timeout
+ while (System.currentTimeMillis() < finishTime) {
+ val numBlockManagers = synchronized {
+ blockManagerIds.size
+ }
+ if (numBlockManagers >= numExecutors + 1) {
+ // Need to count the block manager in driver
+ return
+ }
+ // Sleep rather than using wait/notify, because this is used only for testing and wait/notify
+ // add overhead in the general case.
+ Thread.sleep(10)
+ }
+ throw new TimeoutException(
+ s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f2713478/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index bac6fdb..5b127a0 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -55,6 +55,14 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
+ // In a slow machine, one slave may register hundreds of milliseconds ahead of the other one.
+ // If we don't wait for all salves, it's possible that only one executor runs all jobs. Then
+ // all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch
+ // local blocks from the local BlockManager and won't send requests to ExternalShuffleService.
+ // In this case, we won't receive FetchFailed. And it will make this test fail.
+ // Therefore, we should wait until all salves are up
+ sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)
+
val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)
rdd.count()
http://git-wip-us.apache.org/repos/asf/spark/blob/f2713478/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index c05e8bb..c054c71 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -17,11 +17,9 @@
package org.apache.spark.broadcast
-import scala.concurrent.duration._
import scala.util.Random
import org.scalatest.Assertions
-import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.io.SnappyCompressionCodec
@@ -312,13 +310,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
val _sc =
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
// Wait until all salves are up
- eventually(timeout(10.seconds), interval(10.milliseconds)) {
- _sc.jobProgressListener.synchronized {
- val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
- assert(numBlockManagers == numSlaves + 1,
- s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}")
- }
- }
+ _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
_sc
} else {
new SparkContext("local", "test", broadcastConf)
http://git-wip-us.apache.org/repos/asf/spark/blob/f2713478/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
index 50273bc..d97fba0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -17,12 +17,12 @@
package org.apache.spark.scheduler
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
+import scala.collection.mutable
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import scala.collection.mutable
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
/**
* Unit tests for SparkListener that require a local cluster.
@@ -41,6 +41,10 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)
+ // This test will check if the number of executors received by "SparkListener" is same as the
+ // number of all executors, so we need to wait until all executors are up
+ sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)
+
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org