You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2020/07/30 19:01:04 UTC

[spark] branch master updated: [SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6032c5b  [SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite
6032c5b is described below

commit 6032c5b0320fe70455586f4ce863d5d9361b5e07
Author: Devesh Agrawal <de...@gmail.com>
AuthorDate: Thu Jul 30 12:00:19 2020 -0700

    [SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite
    
    ### What changes were proposed in this pull request?
    
    This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.
    
    ### Description of the problem
    
    Make the block manager decommissioning test be less flaky
    
    An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
    - We schedule the job with tasks on executors 0, 1, 2.
    - We wait 300 ms and decommission executor 0.
    - If the task is not yet done on executor 0, it will now fail because
       the block manager won't be able to save the block. This condition is
       easy to trigger on a loaded machine where the github checks run.
    - The task with retry on a different executor (1 or 2) and its shuffle
       blocks will land there.
    - No actual block migration happens here because the decommissioned
       executor technically failed before it could even produce a block.
    
    To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.
    
    The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.
    
    ### Why are the changes needed?
    
    I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, unit test only change.
    
    ### How was this patch tested?
    
    Github checks. Ran this test 100 times, 10 at a time in parallel in a script.
    
    Closes #29226 from agrawaldevesh/block-manager-decom-flaky.
    
    Authored-by: Devesh Agrawal <de...@gmail.com>
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 .../BlockManagerDecommissionIntegrationSuite.scala | 147 +++++++++++++++------
 1 file changed, 103 insertions(+), 44 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 5741010..6a52f72 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.storage
 
-import java.util.concurrent.Semaphore
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 
@@ -28,29 +29,40 @@ import org.apache.spark._
 import org.apache.spark.internal.config
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
-import org.apache.spark.util.{ResetSystemProperties, ThreadUtils}
+import org.apache.spark.util.{ResetSystemProperties, SystemClock, ThreadUtils}
 
 class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalSparkContext
     with ResetSystemProperties with Eventually {
 
   val numExecs = 3
   val numParts = 3
+  val TaskStarted = "TASK_STARTED"
+  val TaskEnded = "TASK_ENDED"
+  val JobEnded = "JOB_ENDED"
 
   test(s"verify that an already running task which is going to cache data succeeds " +
-    s"on a decommissioned executor") {
-    runDecomTest(true, false, true)
+    s"on a decommissioned executor after task start") {
+    runDecomTest(true, false, TaskStarted)
   }
 
-  test(s"verify that shuffle blocks are migrated") {
-    runDecomTest(false, true, false)
+  test(s"verify that an already running task which is going to cache data succeeds " +
+    s"on a decommissioned executor after one task ends but before job ends") {
+    runDecomTest(true, false, TaskEnded)
   }
 
-  test(s"verify that both migrations can work at the same time.") {
-    runDecomTest(true, true, false)
+  test(s"verify that shuffle blocks are migrated") {
+    runDecomTest(false, true, JobEnded)
   }
 
-  private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean) = {
+  test(s"verify that both migrations can work at the same time") {
+    runDecomTest(true, true, JobEnded)
+  }
 
+  private def runDecomTest(
+      persist: Boolean,
+      shuffle: Boolean,
+      whenToDecom: String): Unit = {
+    val migrateDuring = whenToDecom != JobEnded
     val master = s"local-cluster[${numExecs}, 1, 1024]"
     val conf = new SparkConf().setAppName("test").setMaster(master)
       .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
@@ -61,6 +73,10 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
       // workload we need to worry about.
       .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
 
+    if (whenToDecom == TaskStarted) {
+      // We are using accumulators below, make sure those are reported frequently.
+      conf.set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "10ms")
+    }
     sc = new SparkContext(master, "test", conf)
 
     // Wait for the executors to start
@@ -70,15 +86,29 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
 
     val input = sc.parallelize(1 to numParts, numParts)
     val accum = sc.longAccumulator("mapperRunAccumulator")
-    input.count()
+
+    val sleepIntervalMs = whenToDecom match {
+      // Increase the window of time b/w task started and ended so that we can decom within that.
+      case TaskStarted => 2000
+      // Make one task take a really short time so that we can decommission right after it is
+      // done but before its peers are done.
+      case TaskEnded =>
+        if (TaskContext.getPartitionId() == 0) {
+          100
+        } else {
+          1000
+        }
+      // No sleep otherwise
+      case _ => 0
+    }
 
     // Create a new RDD where we have sleep in each partition, we are also increasing
     // the value of accumulator in each partition
     val baseRdd = input.mapPartitions { x =>
-      if (migrateDuring) {
-        Thread.sleep(1000)
-      }
       accum.add(1)
+      if (sleepIntervalMs > 0) {
+        Thread.sleep(sleepIntervalMs)
+      }
       x.map(y => (y, y))
     }
     val testRdd = shuffle match {
@@ -87,35 +117,46 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     }
 
     // Listen for the job & block updates
-    val taskStartSem = new Semaphore(0)
-    val broadcastSem = new Semaphore(0)
     val executorRemovedSem = new Semaphore(0)
-    val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+    val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]()
+    val executorsActuallyStarted = new ConcurrentHashMap[String, Boolean]()
     val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
-    sc.addSparkListener(new SparkListener {
 
+    def getCandidateExecutorToDecom: Option[String] = if (whenToDecom == TaskStarted) {
+      executorsActuallyStarted.keySet().asScala.headOption
+    } else {
+      taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption
+    }
+
+    sc.addSparkListener(new SparkListener {
       override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = {
         executorRemovedSem.release()
       }
 
-      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
-        taskStartSem.release()
-      }
-
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
-        taskEndEvents.append(taskEnd)
+        taskEndEvents.add(taskEnd)
       }
 
       override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
-        // Once broadcast start landing on the executors we're good to proceed.
-        // We don't only use task start as it can occur before the work is on the executor.
-        if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
-          broadcastSem.release()
-        }
         blocksUpdated.append(blockUpdated)
       }
-    })
 
+      override def onExecutorMetricsUpdate(
+          executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
+        val executorId = executorMetricsUpdate.execId
+        if (executorId != SparkContext.DRIVER_IDENTIFIER) {
+          val validUpdate = executorMetricsUpdate
+            .accumUpdates
+            .flatMap(_._4)
+            .exists { accumInfo =>
+              accumInfo.name == accum.name && accumInfo.update.exists(_.asInstanceOf[Long] >= 1)
+            }
+          if (validUpdate) {
+            executorsActuallyStarted.put(executorId, java.lang.Boolean.TRUE)
+          }
+        }
+      }
+    })
 
     // Cache the RDD lazily
     if (persist) {
@@ -125,28 +166,32 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it is subsequently
+      // decommissioned below.
+      val intervalMs = if (whenToDecom == TaskStarted) {
+        3.milliseconds
+      } else {
+        10.milliseconds
+      }
+      eventually(timeout(6.seconds), interval(intervalMs)) {
+        assert(getCandidateExecutorToDecom.isDefined)
+      }
     } else {
       ThreadUtils.awaitResult(asyncCount, 15.seconds)
     }
 
     // Decommission one of the executors.
     val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
-    val execs = sched.getExecutorIds()
-    assert(execs.size == numExecs, s"Expected ${numExecs} executors but found ${execs.size}")
 
-    val execToDecommission = execs.head
-    logDebug(s"Decommissioning executor ${execToDecommission}")
-    sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo("", false))
+    val execToDecommission = getCandidateExecutorToDecom.get
+    logInfo(s"Decommissioning executor ${execToDecommission}")
+    sched.decommissionExecutor(
+      execToDecommission,
+      ExecutorDecommissionInfo("", isHostDecommissioned = false))
+    val decomTime = new SystemClock().getTimeMillis()
 
     // Wait for job to finish.
     val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
@@ -155,16 +200,31 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     assert(accum.value === numParts)
 
     sc.listenerBus.waitUntilEmpty()
+    val taskEndEventsCopy = taskEndEvents.asScala
     if (shuffle) {
       //  mappers & reducers which succeeded
-      assert(taskEndEvents.count(_.reason == Success) === 2 * numParts,
+      assert(taskEndEventsCopy.count(_.reason == Success) === 2 * numParts,
         s"Expected ${2 * numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})")
     } else {
       // only mappers which executed successfully
-      assert(taskEndEvents.count(_.reason == Success) === numParts,
+      assert(taskEndEventsCopy.count(_.reason == Success) === numParts,
         s"Expected ${numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})")
     }
 
+    val minTaskEndTime = taskEndEventsCopy.map(_.taskInfo.finishTime).min
+    val maxTaskEndTime = taskEndEventsCopy.map(_.taskInfo.finishTime).max
+
+    // Verify that the decom time matched our expectations
+    val decomAssertMsg = s"$whenToDecom: decomTime: $decomTime, minTaskEnd: $minTaskEndTime," +
+      s" maxTaskEnd: $maxTaskEndTime"
+    assert(minTaskEndTime <= maxTaskEndTime, decomAssertMsg)
+    whenToDecom match {
+      case TaskStarted => assert(minTaskEndTime > decomTime, decomAssertMsg)
+      case TaskEnded => assert(minTaskEndTime <= decomTime &&
+        decomTime < maxTaskEndTime, decomAssertMsg)
+      case JobEnded => assert(maxTaskEndTime <= decomTime, decomAssertMsg)
+    }
+
     // Wait for our respective blocks to have migrated
     eventually(timeout(30.seconds), interval(10.milliseconds)) {
       if (persist) {
@@ -224,6 +284,5 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     // should have same value like before
     assert(testRdd.count() === numParts)
     assert(accum.value === numParts)
-
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org