You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/29 08:01:45 UTC

[GitHub] [spark] holdenk commented on a change in pull request #29226: [SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite

holdenk commented on a change in pull request #29226:
URL: https://github.com/apache/spark/pull/29226#discussion_r461133372



##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it is subsequently
+      // decommissioned below.

Review comment:
       This goes against the purpose of this test: making sure that an executor with a running task that receives a decom has the block migrated. It is not migrating during decommissioning in the same way.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it is subsequently
+      // decommissioned below.

Review comment:
       So I think another way we could make sure the test covers what we want is to run a job repeatedly until all 3 executors come up.
   
   The block manager (in decom state) does indeed refuses puts, but RDD computation on the executor goes through `getOrElseUpdate` which immediately calls `doPutIterator` if there is not a cache hit *before* the iterator starts being computed. Since the check to see if the block manager is decommissioning occurs before the start of the computation, not at the end we want to ensure that block can be put (and then later migrated).
   
   

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it is subsequently
+      // decommissioned below.

Review comment:
       Maybe we should add a comment where we do the `isDecommissioning` check to explain that it is intentionally done there so that we don't reject blocks which have already started computation. Do you think that would help?

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it is subsequently
+      // decommissioned below.

Review comment:
       I don't believe we need another block manager PR to realize it, I think this is just a flaky test because we took out the original sleep and tried to use TestUtils which doesn't do a good enough job of waiting for the executor to fully come up.
   
   Since doPut is called *before* the task starts computation, we don't throw away any of the in-progress data.
   
   I'll make an alternate PR to this one to illustrate my understanding and hopefully we can iron it out and make the code path clear for everyone :) 

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -87,36 +113,30 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     }
 
     // Listen for the job & block updates
-    val taskStartSem = new Semaphore(0)
-    val broadcastSem = new Semaphore(0)
     val executorRemovedSem = new Semaphore(0)
-    val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+    val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]()
     val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
-    sc.addSparkListener(new SparkListener {
 
+    def getCandidateExecutorToDecom: Option[String] = if (whenToDecom == TaskStarted) {
+      accum.value.asScala.headOption

Review comment:
       I don't think this is going to work as intended, accumulators send updates back at the end of the task, unless something has changed.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -87,36 +113,30 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
     }
 
     // Listen for the job & block updates
-    val taskStartSem = new Semaphore(0)
-    val broadcastSem = new Semaphore(0)
     val executorRemovedSem = new Semaphore(0)
-    val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+    val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]()
     val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
-    sc.addSparkListener(new SparkListener {
 
+    def getCandidateExecutorToDecom: Option[String] = if (whenToDecom == TaskStarted) {
+      accum.value.asScala.headOption

Review comment:
       If you don't believe that is the case, can we add an assertion in here that none of the tasks have finished yet?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org