You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/04/26 21:50:59 UTC

[spark] branch branch-2.4 updated: [SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in YarnSchedulerBackendSuite

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new ec53a19  [SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in YarnSchedulerBackendSuite
ec53a19 is described below

commit ec53a19adbffba5a9f3e71cca4898f1bd7804abf
Author: “attilapiros” <pi...@gmail.com>
AuthorDate: Fri Apr 26 14:49:49 2019 -0700

    [SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in YarnSchedulerBackendSuite
    
    ## What changes were proposed in this pull request?
    
    The test "RequestExecutors reflects node blacklist and is serializable" is flaky because of multi threaded access of the mock task scheduler. For details check [Mockito FAQ (occasional exceptions like: WrongTypeOfReturnValue)](https://github.com/mockito/mockito/wiki/FAQ#is-mockito-thread-safe). So instead of mocking the task scheduler in the test TaskSchedulerImpl is simply subclassed.
    
    This multithreaded access of the `nodeBlacklist()` method is coming from:
    1) the unit test thread via calling of the method `prepareRequestExecutors()`
    2) the `DriverEndpoint.onStart` which runs a periodic task that ends up calling this method
    
    ## How was this patch tested?
    
    Existing unittest.
    
    (cherry picked from commit e4e4e2b842bffba6805623f2258b27b162b451ba)
    
    Closes #24474 from attilapiros/SPARK-26891-branch-2.4.
    
    Authored-by: “attilapiros” <pi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../cluster/YarnSchedulerBackendSuite.scala        | 35 +++++++++++++++++-----
 1 file changed, 28 insertions(+), 7 deletions(-)

diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
index 7fac57f..bd2cf97 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.scheduler.cluster
 
+import java.util.concurrent.atomic.AtomicReference
+
 import scala.language.reflectiveCalls
 
 import org.mockito.Mockito.when
@@ -27,15 +29,35 @@ import org.apache.spark.serializer.JavaSerializer
 
 class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with LocalSparkContext {
 
+  private var yarnSchedulerBackend: YarnSchedulerBackend = _
+
+  override def afterEach(): Unit = {
+    try {
+      if (yarnSchedulerBackend != null) {
+        yarnSchedulerBackend.stop()
+      }
+    } finally {
+      super.afterEach()
+    }
+  }
+
   test("RequestExecutors reflects node blacklist and is serializable") {
     sc = new SparkContext("local", "YarnSchedulerBackendSuite")
-    val sched = mock[TaskSchedulerImpl]
-    when(sched.sc).thenReturn(sc)
-    val yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) {
+    // Subclassing the TaskSchedulerImpl here instead of using Mockito. For details see SPARK-26891.
+    val sched = new TaskSchedulerImpl(sc) {
+      val blacklistedNodes = new AtomicReference[Set[String]]()
+
+      def setNodeBlacklist(nodeBlacklist: Set[String]): Unit = blacklistedNodes.set(nodeBlacklist)
+
+      override def nodeBlacklist(): Set[String] = blacklistedNodes.get()
+    }
+
+    val yarnSchedulerBackendExtended = new YarnSchedulerBackend(sched, sc) {
       def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = {
         this.hostToLocalTaskCount = hostToLocalTaskCount
       }
     }
+    yarnSchedulerBackend = yarnSchedulerBackendExtended
     val ser = new JavaSerializer(sc.conf).newInstance()
     for {
       blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c"))
@@ -45,16 +67,15 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc
         Map("a" -> 1, "b" -> 2)
       )
     } {
-      yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount)
-      when(sched.nodeBlacklist()).thenReturn(blacklist)
-      val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested)
+      yarnSchedulerBackendExtended.setHostToLocalTaskCount(hostToLocalCount)
+      sched.setNodeBlacklist(blacklist)
+      val req = yarnSchedulerBackendExtended.prepareRequestExecutors(numRequested)
       assert(req.requestedTotal === numRequested)
       assert(req.nodeBlacklist === blacklist)
       assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty)
       // Serialize to make sure serialization doesn't throw an error
       ser.serialize(req)
     }
-    sc.stop()
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org