You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/05/06 08:25:32 UTC
spark git commit: [SPARK-7384][Core][Tests] Fix flaky tests for
distributed mode in BroadcastSuite
Repository: spark
Updated Branches:
refs/heads/master 7b1457839 -> 9f019c722
[SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in BroadcastSuite
Fixed the following failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.3-Maven-pre-YARN/hadoop.version=1.0.4,label=centos/452/testReport/junit/org.apache.spark.broadcast/BroadcastSuite/Unpersisting_HttpBroadcast_on_executors_and_driver_in_distributed_mode/
The tests should wait until all slaves are up. Otherwise, there may be only a part of `BlockManager`s registered, and fail the tests.
Author: zsxwing <zs...@gmail.com>
Closes #5925 from zsxwing/SPARK-7384 and squashes the following commits:
783cb7b [zsxwing] Add comments for _jobProgressListener and remove postfixOps
1009ef1 [zsxwing] [SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in BroadcastSuite
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f019c72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f019c72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f019c72
Branch: refs/heads/master
Commit: 9f019c7223bb79b8d5cd52980b2723a1601d1134
Parents: 7b14578
Author: zsxwing <zs...@gmail.com>
Authored: Tue May 5 23:25:28 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue May 5 23:25:28 2015 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/SparkContext.scala | 8 +++++---
.../org/apache/spark/broadcast/BroadcastSuite.scala | 14 +++++++++++++-
2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9f019c72/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 682dec4..b5f040c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -407,15 +407,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
+ // "_jobProgressListener" should be set up before creating SparkEnv because when creating
+ // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
+ _jobProgressListener = new JobProgressListener(_conf)
+ listenerBus.addListener(jobProgressListener)
+
// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
- _jobProgressListener = new JobProgressListener(_conf)
- listenerBus.addListener(jobProgressListener)
-
_statusTracker = new SparkStatusTracker(this)
_progressBar =
http://git-wip-us.apache.org/repos/asf/spark/blob/9f019c72/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index c8fdfa6..06e5f1c 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -17,9 +17,11 @@
package org.apache.spark.broadcast
+import scala.concurrent.duration._
import scala.util.Random
import org.scalatest.{Assertions, FunSuite}
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkEnv}
import org.apache.spark.io.SnappyCompressionCodec
@@ -307,7 +309,17 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
removeFromDriver: Boolean) {
sc = if (distributed) {
- new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
+ val _sc =
+ new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
+ // Wait until all salves are up
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
+ _sc.jobProgressListener.synchronized {
+ val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
+ assert(numBlockManagers == numSlaves + 1,
+ s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}")
+ }
+ }
+ _sc
} else {
new SparkContext("local", "test", broadcastConf)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org