You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/04/26 21:50:59 UTC
[spark] branch branch-2.4 updated:
[SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in
YarnSchedulerBackendSuite
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new ec53a19 [SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in YarnSchedulerBackendSuite
ec53a19 is described below
commit ec53a19adbffba5a9f3e71cca4898f1bd7804abf
Author: “attilapiros” <pi...@gmail.com>
AuthorDate: Fri Apr 26 14:49:49 2019 -0700
[SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in YarnSchedulerBackendSuite
## What changes were proposed in this pull request?
The test "RequestExecutors reflects node blacklist and is serializable" is flaky because of multi threaded access of the mock task scheduler. For details check [Mockito FAQ (occasional exceptions like: WrongTypeOfReturnValue)](https://github.com/mockito/mockito/wiki/FAQ#is-mockito-thread-safe). So instead of mocking the task scheduler in the test TaskSchedulerImpl is simply subclassed.
This multithreaded access of the `nodeBlacklist()` method is coming from:
1) the unit test thread via calling of the method `prepareRequestExecutors()`
2) the `DriverEndpoint.onStart` which runs a periodic task that ends up calling this method
## How was this patch tested?
Existing unittest.
(cherry picked from commit e4e4e2b842bffba6805623f2258b27b162b451ba)
Closes #24474 from attilapiros/SPARK-26891-branch-2.4.
Authored-by: “attilapiros” <pi...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../cluster/YarnSchedulerBackendSuite.scala | 35 +++++++++++++++++-----
1 file changed, 28 insertions(+), 7 deletions(-)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
index 7fac57f..bd2cf97 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.scheduler.cluster
+import java.util.concurrent.atomic.AtomicReference
+
import scala.language.reflectiveCalls
import org.mockito.Mockito.when
@@ -27,15 +29,35 @@ import org.apache.spark.serializer.JavaSerializer
class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with LocalSparkContext {
+ private var yarnSchedulerBackend: YarnSchedulerBackend = _
+
+ override def afterEach(): Unit = {
+ try {
+ if (yarnSchedulerBackend != null) {
+ yarnSchedulerBackend.stop()
+ }
+ } finally {
+ super.afterEach()
+ }
+ }
+
test("RequestExecutors reflects node blacklist and is serializable") {
sc = new SparkContext("local", "YarnSchedulerBackendSuite")
- val sched = mock[TaskSchedulerImpl]
- when(sched.sc).thenReturn(sc)
- val yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) {
+ // Subclassing the TaskSchedulerImpl here instead of using Mockito. For details see SPARK-26891.
+ val sched = new TaskSchedulerImpl(sc) {
+ val blacklistedNodes = new AtomicReference[Set[String]]()
+
+ def setNodeBlacklist(nodeBlacklist: Set[String]): Unit = blacklistedNodes.set(nodeBlacklist)
+
+ override def nodeBlacklist(): Set[String] = blacklistedNodes.get()
+ }
+
+ val yarnSchedulerBackendExtended = new YarnSchedulerBackend(sched, sc) {
def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = {
this.hostToLocalTaskCount = hostToLocalTaskCount
}
}
+ yarnSchedulerBackend = yarnSchedulerBackendExtended
val ser = new JavaSerializer(sc.conf).newInstance()
for {
blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c"))
@@ -45,16 +67,15 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc
Map("a" -> 1, "b" -> 2)
)
} {
- yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount)
- when(sched.nodeBlacklist()).thenReturn(blacklist)
- val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested)
+ yarnSchedulerBackendExtended.setHostToLocalTaskCount(hostToLocalCount)
+ sched.setNodeBlacklist(blacklist)
+ val req = yarnSchedulerBackendExtended.prepareRequestExecutors(numRequested)
assert(req.requestedTotal === numRequested)
assert(req.nodeBlacklist === blacklist)
assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty)
// Serialize to make sure serialization doesn't throw an error
ser.serialize(req)
}
- sc.stop()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org