You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/06/28 08:11:36 UTC

[GitHub] [spark] juliuszsompolski commented on a diff in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

juliuszsompolski commented on code in PR #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r908175059


##########
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.Semaphore
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark._
+import org.apache.spark.internal.config
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+import org.apache.spark.util.{ResetSystemProperties, ThreadUtils}
+
+class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalSparkContext
+    with ResetSystemProperties with Eventually {
+
+  val numExecs = 3
+  val numParts = 3
+
+  test(s"verify that an already running task which is going to cache data succeeds " +
+    s"on a decommissioned executor") {
+    runDecomTest(true, false, true)
+  }
+
+  test(s"verify that shuffle blocks are migrated") {
+    runDecomTest(false, true, false)
+  }
+
+  test(s"verify that both migrations can work at the same time.") {
+    runDecomTest(true, true, false)
+  }
+
+  private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean) = {
+
+    val master = s"local-cluster[${numExecs}, 1, 1024]"
+    val conf = new SparkConf().setAppName("test").setMaster(master)
+      .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
+      .set(config.STORAGE_DECOMMISSION_ENABLED, true)
+      .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist)
+      .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
+      // Just replicate blocks as fast as we can during testing, there isn't another
+      // workload we need to worry about.
+      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
+
+    sc = new SparkContext(master, "test", conf)
+
+    // Wait for the executors to start
+    TestUtils.waitUntilExecutorsUp(sc = sc,
+      numExecutors = numExecs,
+      timeout = 60000) // 60s
+
+    val input = sc.parallelize(1 to numParts, numParts)
+    val accum = sc.longAccumulator("mapperRunAccumulator")
+    input.count()
+
+    // Create a new RDD where we have sleep in each partition, we are also increasing
+    // the value of accumulator in each partition
+    val baseRdd = input.mapPartitions { x =>
+      if (migrateDuring) {
+        Thread.sleep(1000)
+      }
+      accum.add(1)
+      x.map(y => (y, y))
+    }
+    val testRdd = shuffle match {
+      case true => baseRdd.reduceByKey(_ + _)
+      case false => baseRdd
+    }
+
+    // Listen for the job & block updates
+    val taskStartSem = new Semaphore(0)
+    val broadcastSem = new Semaphore(0)
+    val executorRemovedSem = new Semaphore(0)
+    val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+    val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
+    sc.addSparkListener(new SparkListener {
+
+      override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = {
+        executorRemovedSem.release()
+      }
+
+      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+        taskStartSem.release()
+      }
+
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+        taskEndEvents.append(taskEnd)
+      }
+
+      override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+        // Once broadcast start landing on the executors we're good to proceed.
+        // We don't only use task start as it can occur before the work is on the executor.
+        if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
+          broadcastSem.release()
+        }
+        blocksUpdated.append(blockUpdated)
+      }
+    })
+
+
+    // Cache the RDD lazily
+    if (persist) {
+      testRdd.persist()
+    }
+
+    // Start the computation of RDD - this step will also cache the RDD
+    val asyncCount = testRdd.countAsync()
+
+    // Wait for the job to have started.
+    taskStartSem.acquire(1)
+    // Wait for each executor + driver to have it's broadcast info delivered.
+    broadcastSem.acquire((numExecs + 1))
+
+    // Make sure the job is either mid run or otherwise has data to migrate.
+    if (migrateDuring) {
+      // Give Spark a tiny bit to start executing after the broadcast blocks land.
+      // For me this works at 100, set to 300 for system variance.
+      Thread.sleep(300)
+    } else {
+      ThreadUtils.awaitResult(asyncCount, 15.seconds)
+    }
+
+    // Decommission one of the executors.
+    val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
+    val execs = sched.getExecutorIds()
+    assert(execs.size == numExecs, s"Expected ${numExecs} executors but found ${execs.size}")
+
+    val execToDecommission = execs.head
+    logDebug(s"Decommissioning executor ${execToDecommission}")
+    sched.decommissionExecutor(execToDecommission)
+
+    // Wait for job to finish.
+    val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
+    assert(asyncCountResult === numParts)
+    // All tasks finished, so accum should have been increased numParts times.
+    assert(accum.value === numParts)
+
+    sc.listenerBus.waitUntilEmpty()
+    if (shuffle) {
+      //  mappers & reducers which succeeded
+      assert(taskEndEvents.count(_.reason == Success) === 2 * numParts,
+        s"Expected ${2 * numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})")
+    } else {
+      // only mappers which executed successfully
+      assert(taskEndEvents.count(_.reason == Success) === numParts,
+        s"Expected ${numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})")
+    }
+
+    // Wait for our respective blocks to have migrated
+    eventually(timeout(30.seconds), interval(10.milliseconds)) {
+      if (persist) {
+        // One of our blocks should have moved.
+        val rddUpdates = blocksUpdated.filter { update =>
+          val blockId = update.blockUpdatedInfo.blockId
+          blockId.isRDD}
+        val blockLocs = rddUpdates.map { update =>
+          (update.blockUpdatedInfo.blockId.name,
+            update.blockUpdatedInfo.blockManagerId)}
+        val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
+        assert(!blocksToManagers.filter(_._2 > 1).isEmpty,
+          s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" +
+          s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
+      }

Review Comment:
   @holdenk 
   If `shuffle == true` and `when == TASK_STARTED` or `TASK_ENDED`, isn't it feasible that a block wouldn't have moved:
   Node decomissioning was triggered during the mapper stage (after task started, or after task ended). Wouldn't it be feasible that decomissioning finished before the reducer stage even started, and hence the persisted rdd blocks never moved, but the reducer stage was executed on the 2 remaining executors in the first place?
   
   The test passes, so it seems to not be happening like that, but I am facing some failures here when making an unrelated change, so I'm wondering if I understand something wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org