You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/07/30 06:03:41 UTC

[GitHub] [spark] felixcheung commented on a change in pull request #25236: [SPARK-28487][k8s] More responsive dynamic allocation with K8S.

felixcheung commented on a change in pull request #25236: [SPARK-28487][k8s] More responsive dynamic allocation with K8S.
URL: https://github.com/apache/spark/pull/25236#discussion_r308542339
 
 

 ##########
 File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 ##########
 @@ -66,97 +66,167 @@ private[spark] class ExecutorPodsAllocator(
   // snapshot yet. Mapped to the timestamp when they were created.
   private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
 
+  private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf)
+
+  private val hasPendingPods = new AtomicBoolean()
+
+  private var lastSnapshot = ExecutorPodsSnapshot(Nil)
+
   def start(applicationId: String): Unit = {
     snapshotsStore.addSubscriber(podAllocationDelay) {
       onNewSnapshots(applicationId, _)
     }
   }
 
-  def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total)
+  def setTotalExpectedExecutors(total: Int): Unit = {
+    totalExpectedExecutors.set(total)
+    if (!hasPendingPods.get()) {
+      snapshotsStore.notifySubscribers()
+    }
+  }
 
-  private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
+  private def onNewSnapshots(
+      applicationId: String,
+      snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
     newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
     // For all executors we've created against the API but have not seen in a snapshot
     // yet - check the current time. If the current time has exceeded some threshold,
     // assume that the pod was either never created (the API server never properly
     // handled the creation request), or the API server created the pod but we missed
     // both the creation and deletion events. In either case, delete the missing pod
     // if possible, and mark such a pod to be rescheduled below.
-    newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
-      val currentTime = clock.getTimeMillis()
+    val currentTime = clock.getTimeMillis()
+    val timedOut = newlyCreatedExecutors.flatMap { case (execId, timeCreated) =>
       if (currentTime - timeCreated > podCreationTimeout) {
-        logWarning(s"Executor with id $execId was not detected in the Kubernetes" +
-          s" cluster after $podCreationTimeout milliseconds despite the fact that a" +
-          " previous allocation attempt tried to create it. The executor may have been" +
-          " deleted but the application missed the deletion event.")
-
-        if (shouldDeleteExecutors) {
-          Utils.tryLogNonFatalError {
-            kubernetesClient
-              .pods()
-              .withLabel(SPARK_APP_ID_LABEL, applicationId)
-              .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
-              .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
-              .delete()
-          }
-        }
-        newlyCreatedExecutors -= execId
+        Some(execId)
       } else {
         logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
           s" was created ${currentTime - timeCreated} milliseconds ago.")
+        None
+      }
+    }
+
+    if (timedOut.nonEmpty) {
+      logWarning(s"Executors with ids ${timedOut.mkString(",")} were not detected in the" +
+        s" Kubernetes cluster after $podCreationTimeout ms despite the fact that a previous" +
+        " allocation attempt tried to create them. The executors may have been deleted but the" +
+        " application missed the deletion event.")
+
+      newlyCreatedExecutors --= timedOut
+      if (shouldDeleteExecutors) {
+        Utils.tryLogNonFatalError {
+          kubernetesClient
+            .pods()
+            .withLabel(SPARK_APP_ID_LABEL, applicationId)
+            .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+            .withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
+            .delete()
+        }
       }
     }
 
     if (snapshots.nonEmpty) {
-      // Only need to examine the cluster as of the latest snapshot, the "current" state, to see if
-      // we need to allocate more executors or not.
-      val latestSnapshot = snapshots.last
-      val currentRunningExecutors = latestSnapshot.executorPods.values.count {
-        case PodRunning(_) => true
+      lastSnapshot = snapshots.last
+    }
+
+    val currentRunningExecutors = lastSnapshot.executorPods.values.count {
+      case PodRunning(_) => true
+      case _ => false
+    }
+
+    val currentPendingExecutors = lastSnapshot.executorPods
+      .filter {
+        case (_, PodPending(_)) => true
         case _ => false
       }
-      val currentPendingExecutors = latestSnapshot.executorPods.values.count {
-        case PodPending(_) => true
-        case _ => false
+      .map { case (id, _) => id }
+
+    if (snapshots.nonEmpty) {
+      logDebug(s"Pod allocation status: $currentRunningExecutors running, " +
+        s"${currentPendingExecutors.size} pending, " +
+        s"${newlyCreatedExecutors.size} unacknowledged.")
+    }
+
+    val currentTotalExpectedExecutors = totalExpectedExecutors.get
+
+    // This variable is used later to print some debug logs. It's updated when cleaning up
+    // excess pod requests, since currentPendingExecutors is immutable.
+    var knownPendingCount = currentPendingExecutors.size
+
+    // It's possible that we have outstanding pods that are outdated when dynamic allocation
+    // decides to downscale the application. So check if we can release any pending pods early
+    // instead of waiting for them to time out. Drop them first from the unacknowledged list,
+    // then from the pending.
+    //
+    // TODO: with dynamic allocation off, handle edge cases if we end up with more running
+    // executors than expected.
+    val knownPodCount = currentRunningExecutors + currentPendingExecutors.size +
+      newlyCreatedExecutors.size
+    if (knownPodCount > currentTotalExpectedExecutors) {
+      val excess = knownPodCount - currentTotalExpectedExecutors
+      val knownPendingToDelete = currentPendingExecutors.take(excess - newlyCreatedExecutors.size)
+      val toDelete = newlyCreatedExecutors.keys.take(excess).toList ++ knownPendingToDelete
+
+      if (toDelete.nonEmpty) {
+        logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
+        Utils.tryLogNonFatalError {
+          kubernetesClient
+            .pods()
+            .withField("status.phase", "Pending")
+            .withLabel(SPARK_APP_ID_LABEL, applicationId)
+            .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+            .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
+            .delete()
+          newlyCreatedExecutors --= toDelete
+          knownPendingCount -= knownPendingToDelete.size
+        }
       }
-      val currentTotalExpectedExecutors = totalExpectedExecutors.get
-      logDebug(s"Currently have $currentRunningExecutors running executors and" +
-        s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" +
-        s" have been requested but are pending appearance in the cluster.")
-      if (newlyCreatedExecutors.isEmpty
-        && currentPendingExecutors == 0
+    }
+
+    if (newlyCreatedExecutors.isEmpty
+        && currentPendingExecutors.isEmpty
         && currentRunningExecutors < currentTotalExpectedExecutors) {
-        val numExecutorsToAllocate = math.min(
-          currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize)
-        logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
-        for ( _ <- 0 until numExecutorsToAllocate) {
-          val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
-          val executorConf = KubernetesConf.createExecutorConf(
-            conf,
-            newExecutorId.toString,
-            applicationId,
-            driverPod)
-          val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr,
-            kubernetesClient)
-          val podWithAttachedContainer = new PodBuilder(executorPod.pod)
-            .editOrNewSpec()
-            .addToContainers(executorPod.container)
-            .endSpec()
-            .build()
-          kubernetesClient.pods().create(podWithAttachedContainer)
-          newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
-          logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
-        }
-      } else if (currentRunningExecutors >= currentTotalExpectedExecutors) {
-        // TODO handle edge cases if we end up with more running executors than expected.
-        logDebug("Current number of running executors is equal to the number of requested" +
-          " executors. Not scaling up further.")
-      } else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) {
-        logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" +
-          s" executors to begin running before requesting for more executors. # of executors in" +
-          s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" +
-          s" created but we have not observed as being present in the cluster yet:" +
-          s" ${newlyCreatedExecutors.size}.")
+      val numExecutorsToAllocate = math.min(
+        currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize)
+      logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
+      for ( _ <- 0 until numExecutorsToAllocate) {
+        val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
+        val executorConf = KubernetesConf.createExecutorConf(
+          conf,
+          newExecutorId.toString,
+          applicationId,
+          driverPod)
+        val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr,
+          kubernetesClient)
+        val podWithAttachedContainer = new PodBuilder(executorPod.pod)
+          .editOrNewSpec()
+          .addToContainers(executorPod.container)
+          .endSpec()
+          .build()
+        kubernetesClient.pods().create(podWithAttachedContainer)
+        newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
+        logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
+      }
+    }
+
+    // Update the flag that helps the setTotalExpectedExecutors() callback avoid trigerring this
 
 Review comment:
   trigerring -> trigering

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org