You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/10/20 05:52:10 UTC

[spark] branch branch-3.1 updated: [SPARK-37049][K8S] executorIdleTimeout should check `creationTimestamp` instead of `startTime`

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 74fe4fa  [SPARK-37049][K8S] executorIdleTimeout should check `creationTimestamp` instead of `startTime`
74fe4fa is described below

commit 74fe4fadfa76b15560a78ce53f53319f015819e7
Author: Weiwei Yang <wy...@cloudera.com>
AuthorDate: Tue Oct 19 22:42:06 2021 -0700

    [SPARK-37049][K8S] executorIdleTimeout should check `creationTimestamp` instead of `startTime`
    
    SPARK-33099 added the support to respect `spark.dynamicAllocation.executorIdleTimeout` in `ExecutorPodsAllocator`. However, when it checks if a pending executor pod is timed out, it checks against the pod's [startTime](https://github.com/kubernetes/api/blob/2a5dae08c42b1e8fdc1379432d8898efece65363/core/v1/types.go#L3664-L3667), see code [here](https://github.com/apache/spark/blob/c2ba498ff678ddda034cedf45cc17fbeefe922fd/resource-managers/kubernetes/core/src/main/scala/org/apache/spark [...]
    
    This can be reproduced locally, run the following job
    
    ```
    ${SPARK_HOME}/bin/spark-submit --master k8s://http://localhost:8001 --deploy-mode cluster --name spark-group-example \
      --master k8s://http://localhost:8001 --deploy-mode cluster \
      --class org.apache.spark.examples.GroupByTest \
      --conf spark.executor.instances=1 \
      --conf spark.kubernetes.namespace=spark-test \
      --conf spark.kubernetes.executor.request.cores=1 \
      --conf spark.dynamicAllocation.enabled=true \
      --conf spark.shuffle.service.enabled=true \
      --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
      --conf spark.shuffle.service.enabled=false \
      --conf spark.kubernetes.container.image=local/spark:3.3.0 \
      --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
      local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar \
      1000 1000 100 1000
    ```
    
    the local cluster doesn't have enough resources to run more than 4 executors, the rest of the executor pods will be pending. The job will have task backlogs and triggers to request more executors from K8s:
    
    ```
    21/10/19 22:51:45 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1 running: 0.
    21/10/19 22:51:51 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 1.
    21/10/19 22:51:52 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes for ResourceProfile Id: 0, target: 4 running: 2.
    21/10/19 22:51:53 INFO ExecutorPodsAllocator: Going to request 4 executors from Kubernetes for ResourceProfile Id: 0, target: 8 running: 4.
    ...
    21/10/19 22:52:14 INFO ExecutorPodsAllocator: Deleting 39 excess pod requests (23,59,32,41,50,68,35,44,17,8,53,62,26,71,11,56,29,38,47,20,65,5,14,46,64,73,55,49,40,67,58,13,22,31,7,16,52,70,43).
    21/10/19 22:52:18 INFO ExecutorPodsAllocator: Deleting 28 excess pod requests (25,34,61,37,10,19,28,60,69,63,45,54,72,36,18,9,27,21,57,12,48,30,39,66,15,42,24,33).
    ```
    
    At `22:51:45`, it starts to request executors; and at  `22:52:14` it starts to delete excess executor pods. This is 29s but spark.dynamicAllocation.executorIdleTimeout is set to 60s. The config was not honored.
    
    ### What changes were proposed in this pull request?
    Change the check from using pod's `startTime` to `creationTimestamp`. [creationTimestamp](https://github.com/kubernetes/apimachinery/blob/e6c90c4366be1504309a6aafe0d816856450f36a/pkg/apis/meta/v1/types.go#L193-L201) is the timestamp when a pod gets created on K8s:
    
    ```
    // CreationTimestamp is a timestamp representing the server time when this object was
    // created. It is not guaranteed to be set in happens-before order across separate operations.
    // Clients may not set this value. It is represented in RFC3339 form and is in UTC.
    ```
    
    [startTime](https://github.com/kubernetes/api/blob/2a5dae08c42b1e8fdc1379432d8898efece65363/core/v1/types.go#L3664-L3667) is the timestamp when pod gets started:
    
    ```
    // RFC 3339 date and time at which the object was acknowledged by the Kubelet.
    // This is before the Kubelet pulled the container image(s) for the pod.
    // +optional
    ```
    
    a pending pod's startTime is empty. Here is a example of a pending pod:
    
    ```
    NAMESPACE     NAME                                     READY   STATUS    RESTARTS   AGE
    default       pending-pod-example                      0/1     Pending   0          2s
    
    kubectl get pod pending-pod-example -o yaml | grep creationTimestamp
    --->  creationTimestamp: "2021-10-19T16:17:52Z"
    
    // pending pod has no startTime
    kubectl get pod pending-pod-example -o yaml | grep startTime
    ---> // empty
    
    // running pod has startTime set to the timestamp when the pod gets started
    kubectl get pod coredns-558bd4d5db-6qrtx -n kube-system -o yaml | grep startTime
            f:startTime: {}
    ---> startTime: "2021-08-04T23:44:44Z"
    ```
    
    ### Why are the changes needed?
    This fixed the issue that `spark.dynamicAllocation.executorIdleTimeout` currently is not honored by pending executor pods.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    The PR includes the UT changes, that has the testing coverage for this issue.
    
    Closes #34319 from yangwwei/SPARK-37049.
    
    Authored-by: Weiwei Yang <wy...@cloudera.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 041cd5d7d15ec4184ae51a8a10a26bef05bd261f)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala       | 8 ++++----
 .../spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala  | 4 +++-
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index c83b8b8..9e1794f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -369,11 +369,11 @@ private[spark] class ExecutorPodsAllocator(
 
   private def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = {
     try {
-      val startTime = Instant.parse(state.pod.getStatus.getStartTime).toEpochMilli()
-      currentTime - startTime > executorIdleTimeout
+      val creationTime = Instant.parse(state.pod.getMetadata.getCreationTimestamp).toEpochMilli()
+      currentTime - creationTime > executorIdleTimeout
     } catch {
-      case _: Exception =>
-        logDebug(s"Cannot get startTime of pod ${state.pod}")
+      case e: Exception =>
+        logError(s"Cannot get the creationTimestamp of the pod: ${state.pod}", e)
         true
     }
   }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
index 41cba57..0b3ce6d 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
@@ -62,9 +62,11 @@ object ExecutorLifecycleTestUtils {
 
   def pendingExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
     new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
+      .editOrNewMetadata()
+        .withCreationTimestamp(Instant.now.toString)
+        .endMetadata()
       .editOrNewStatus()
         .withPhase("pending")
-        .withStartTime(Instant.now.toString)
         .endStatus()
       .build()
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org