You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/21 19:17:47 UTC
spark git commit: [SPARK-18031][TESTS] Fix flaky test
ExecutorAllocationManagerSuite.basic functionality
Repository: spark
Updated Branches:
refs/heads/master 607a1e63d -> ccfe60a83
[SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality
## What changes were proposed in this pull request?
The failure is because in `test("basic functionality")`, it doesn't block until `ExecutorAllocationManager.manageAllocation` is called. This PR just adds StreamManualClock to allow the tests to block on expected wait time to make the test deterministic.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <sh...@databricks.com>
Closes #16321 from zsxwing/SPARK-18031.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccfe60a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccfe60a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccfe60a8
Branch: refs/heads/master
Commit: ccfe60a8304871779ff1b31b8c2d724f59d5b2af
Parents: 607a1e6
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Dec 21 11:17:44 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Dec 21 11:17:44 2016 -0800
----------------------------------------------------------------------
.../ExecutorAllocationManagerSuite.scala | 36 +++++++++++++++++---
1 file changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ccfe60a8/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index b49e579..1d2bf35 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
private val batchDurationMillis = 1000L
private var allocationClient: ExecutorAllocationClient = null
- private var clock: ManualClock = null
+ private var clock: StreamManualClock = null
before {
allocationClient = mock[ExecutorAllocationClient]
- clock = new ManualClock()
+ clock = new StreamManualClock()
}
test("basic functionality") {
@@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
reset(allocationClient)
when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
- clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1)
+ val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1
+ val expectedWaitTime = clock.getTimeMillis() + advancedTime
+ clock.advance(advancedTime)
+ // Make sure ExecutorAllocationManager.manageAllocation is called
eventually(timeout(10 seconds)) {
- body
+ assert(clock.isStreamWaitingAt(expectedWaitTime))
}
+ body
}
/** Verify that the expected number of total executor were requested */
@@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
}
}
}
+
+/**
+ * A special manual clock that provide `isStreamWaitingAt` to allow the user to check if the clock
+ * is blocking.
+ */
+class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
+ private var waitStartTime: Option[Long] = None
+
+ override def waitTillTime(targetTime: Long): Long = synchronized {
+ try {
+ waitStartTime = Some(getTimeMillis())
+ super.waitTillTime(targetTime)
+ } finally {
+ waitStartTime = None
+ }
+ }
+
+ /**
+ * Returns if the clock is blocking and the time it started to block is the parameter `time`.
+ */
+ def isStreamWaitingAt(time: Long): Boolean = synchronized {
+ waitStartTime == Some(time)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org