You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mccheah <gi...@git.apache.org> on 2018/05/04 22:24:57 UTC

[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

GitHub user mccheah opened a pull request:

    https://github.com/apache/spark/pull/21241

    [SPARK-24135][K8s] Resilience to init-container errors on executors.

    ## What changes were proposed in this pull request?
    
    Spark doesn't attach init-containers. But if a custom web hook or pod preset adds init-containers, we need to be resilient to transient failures of these containers and to at least retry them.
    
    ## How was this patch tested?
    
    Unit tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/palantir/spark handle-init-errors

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21241.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21241
    
----
commit 27cfc943e684ac6c20949b300767bb1b29b496e6
Author: mcheah <mc...@...>
Date:   2018-05-01T16:22:19Z

    [SPARK-24135][K8s] Resilience to init-container errors on executors.
    
    Spark doesn't attach init-containers. But if a custom web hook or pod
    preset adds init-containers, we need to be resilient to transient
    failures of these containers and to at least retry them.

commit c9f7e102dee6ab453f97401d274a05cd23a2c3e2
Author: mcheah <mc...@...>
Date:   2018-05-04T22:16:41Z

    Make the failure count configurable.

commit 52df0f24d66e97be73d57e7121195170e3b0960b
Author: mcheah <mc...@...>
Date:   2018-05-04T22:19:53Z

    Fix compilation

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Kubernetes integration test starting
    URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/2853/



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by foxish <gi...@git.apache.org>.
Github user foxish commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186642338
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    --- End diff --
    
    Maybe have a separate set for all the error states we want to check. Having one place would make this easier to change in future.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186852387
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    --- End diff --
    
    Even running at degraded can be concerning though. What if we only get 1 executor for an application that's processing terabytes of data? I don't think there's an intuitive default here, though perhaps if we're at say 75% of requested capacity or some arbitrary number then we can continue. Alternatively, we can just retry requesting executors forever and never consider these executor startup failures to be real problems (which would remove this if statement, for example). I don't think it's entirely unreasonable to always retry requesting executors when they fail to start up.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    **[Test build #94354 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94354/testReport)** for PR 21241 at commit [`9df84e8`](https://github.com/apache/spark/commit/9df84e87a36bd6b43b0f74c26ad3bf70a67bb467).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by erikerlandson <gi...@git.apache.org>.
Github user erikerlandson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186855467
  
    --- Diff: docs/running-on-kubernetes.md ---
    @@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
         This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task 
         parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
     </tr>
    +<tr>
    +  <td><code>spark.kubernetes.executor.maxInitFailures</code></td>
    +  <td>10</td>
    +  <td>
    +    Maximum number of times executors are allowed to fail with an Init:Error state before failing the application. Note that Init:Error failures should not be caused by Spark itself because Spark does not attach init-containers to pods. Init-containers can be attached by the cluster itself. Users should check with their cluster administrator if these kinds of failures to start the executor pod occur frequently.
    --- End diff --
    
    As long as it's relatively easy to extend, generalizing on a case by case basis should be OK


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by liyinan926 <gi...@git.apache.org>.
Github user liyinan926 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186790517
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    +            throw new SparkException(errorMessage)
               }
    +          handleFailedPod(action, pod, podName, podIP)
     
    -          val executorExitReason = if (action == Action.ERROR) {
    -            logWarning(s"Received error event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnError(pod)
    -          } else if (action == Action.DELETED) {
    -            logWarning(s"Received delete event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnDelete(pod)
    -          } else {
    -            throw new IllegalStateException(
    -              s"Unknown action that should only be DELETED or ERROR: $action")
    -          }
    -          podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
    -
    -          if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
    -            log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
    -              s"watch received an event of type $action for this executor. The executor may " +
    -              "have failed to start in the first place and never registered with the driver.")
    -          }
    -          disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
    +        case Action.DELETED | Action.ERROR =>
    +          handleFailedPod(action, pod, podName, podIP)
     
             case _ => logDebug(s"Received event of executor pod $podName: " + action)
           }
         }
     
    +    private def handleFailedPod(action: Action, pod: Pod, podName: String, podIP: String) = {
    +      val executorId = getExecutorId(pod)
    +      logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    +      if (podIP != null) {
    +        executorPodsByIPs.remove(podIP)
    +      }
    +
    +      val executorExitReason = if (action == Action.ERROR) {
    +        logWarning(s"Received error event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnError(pod)
    +      } else if (action == Action.DELETED) {
    +        logWarning(s"Received delete event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnDelete(pod)
    +      } else if (action == Action.MODIFIED) {
    +        executorExitReasonOnInitError(pod)
    --- End diff --
    
    I think this https://kubernetes.io/docs/tasks/debug-application-cluster/debug-init-containers/#understanding-pod-status.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    **[Test build #90222 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90222/testReport)** for PR 21241 at commit [`52df0f2`](https://github.com/apache/spark/commit/52df0f24d66e97be73d57e7121195170e3b0960b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by liyinan926 <gi...@git.apache.org>.
Github user liyinan926 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186866387
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    --- End diff --
    
    Yes, you will see if with `kubectl describe`. But I doubt you will see it with the client library. This whole PR makes little sense if those values are never seen. I think you need to check both the pod phase (if it is `Failed`) and the `Initialized` condition is `true` to determine if the pod failed on any init-container.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    **[Test build #90224 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90224/testReport)** for PR 21241 at commit [`9df84e8`](https://github.com/apache/spark/commit/9df84e87a36bd6b43b0f74c26ad3bf70a67bb467).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    **[Test build #90222 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90222/testReport)** for PR 21241 at commit [`52df0f2`](https://github.com/apache/spark/commit/52df0f24d66e97be73d57e7121195170e3b0960b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by foxish <gi...@git.apache.org>.
Github user foxish commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186643210
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    +            throw new SparkException(errorMessage)
               }
    +          handleFailedPod(action, pod, podName, podIP)
     
    -          val executorExitReason = if (action == Action.ERROR) {
    -            logWarning(s"Received error event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnError(pod)
    -          } else if (action == Action.DELETED) {
    -            logWarning(s"Received delete event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnDelete(pod)
    -          } else {
    -            throw new IllegalStateException(
    -              s"Unknown action that should only be DELETED or ERROR: $action")
    -          }
    -          podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
    -
    -          if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
    -            log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
    -              s"watch received an event of type $action for this executor. The executor may " +
    -              "have failed to start in the first place and never registered with the driver.")
    -          }
    -          disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
    +        case Action.DELETED | Action.ERROR =>
    +          handleFailedPod(action, pod, podName, podIP)
     
             case _ => logDebug(s"Received event of executor pod $podName: " + action)
           }
         }
     
    +    private def handleFailedPod(action: Action, pod: Pod, podName: String, podIP: String) = {
    +      val executorId = getExecutorId(pod)
    +      logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    +      if (podIP != null) {
    +        executorPodsByIPs.remove(podIP)
    +      }
    +
    +      val executorExitReason = if (action == Action.ERROR) {
    +        logWarning(s"Received error event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnError(pod)
    +      } else if (action == Action.DELETED) {
    +        logWarning(s"Received delete event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnDelete(pod)
    +      } else if (action == Action.MODIFIED) {
    +        executorExitReasonOnInitError(pod)
    --- End diff --
    
    I think `Action.MODIFIED` can be fired for a lot of other reasons. I was thinking that we should just use container exit status, and have that reflect either the main/init container status depending on which container failed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186252592
  
    --- Diff: docs/running-on-kubernetes.md ---
    @@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
         This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task 
         parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
     </tr>
    +<tr>
    +  <td><code>spark.kubernetes.executor.maxInitFailures</code></td>
    +  <td>10</td>
    --- End diff --
    
    instead of a fixed number, should this be expressed as a factor on the total number of executors?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by liyinan926 <gi...@git.apache.org>.
Github user liyinan926 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186862624
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    --- End diff --
    
    I took a look at the client code, and it appears to me that `getPhase` simply returns the value of json property `phase` of `PodStatus`. Have you seen `Init:Error` as the return value in practice?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    @foxish @liyinan926 @eje please take a look.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by foxish <gi...@git.apache.org>.
Github user foxish commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186869138
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    --- End diff --
    
    The termination reason is also a good source of info. kubectl looks at a set of these and turns it into what you see in the describe output - so, [similar logic](https://github.com/kubernetes/kubernetes/blob/6b94e872c63eeea2ed4fdc510c008e4ff9713953/pkg/printers/internalversion/printers.go#L547-L573) could be exercised. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186793785
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    +            throw new SparkException(errorMessage)
               }
    +          handleFailedPod(action, pod, podName, podIP)
     
    -          val executorExitReason = if (action == Action.ERROR) {
    -            logWarning(s"Received error event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnError(pod)
    -          } else if (action == Action.DELETED) {
    -            logWarning(s"Received delete event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnDelete(pod)
    -          } else {
    -            throw new IllegalStateException(
    -              s"Unknown action that should only be DELETED or ERROR: $action")
    -          }
    -          podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
    -
    -          if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
    -            log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
    -              s"watch received an event of type $action for this executor. The executor may " +
    -              "have failed to start in the first place and never registered with the driver.")
    -          }
    -          disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
    +        case Action.DELETED | Action.ERROR =>
    +          handleFailedPod(action, pod, podName, podIP)
     
             case _ => logDebug(s"Received event of executor pod $podName: " + action)
           }
         }
     
    +    private def handleFailedPod(action: Action, pod: Pod, podName: String, podIP: String) = {
    +      val executorId = getExecutorId(pod)
    +      logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    +      if (podIP != null) {
    +        executorPodsByIPs.remove(podIP)
    +      }
    +
    +      val executorExitReason = if (action == Action.ERROR) {
    +        logWarning(s"Received error event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnError(pod)
    +      } else if (action == Action.DELETED) {
    +        logWarning(s"Received delete event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnDelete(pod)
    +      } else if (action == Action.MODIFIED) {
    +        executorExitReasonOnInitError(pod)
    --- End diff --
    
    That only covers init-containers - I think @foxish is referring to other classes of initialization errors, like failing to pull the image.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186853331
  
    --- Diff: docs/running-on-kubernetes.md ---
    @@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
         This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task 
         parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
     </tr>
    +<tr>
    +  <td><code>spark.kubernetes.executor.maxInitFailures</code></td>
    +  <td>10</td>
    +  <td>
    +    Maximum number of times executors are allowed to fail with an Init:Error state before failing the application. Note that Init:Error failures should not be caused by Spark itself because Spark does not attach init-containers to pods. Init-containers can be attached by the cluster itself. Users should check with their cluster administrator if these kinds of failures to start the executor pod occur frequently.
    --- End diff --
    
    We can also just start with a minimal set and just keep adding them as we find more root causes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by liyinan926 <gi...@git.apache.org>.
Github user liyinan926 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186782288
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    +            throw new SparkException(errorMessage)
               }
    +          handleFailedPod(action, pod, podName, podIP)
     
    -          val executorExitReason = if (action == Action.ERROR) {
    -            logWarning(s"Received error event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnError(pod)
    -          } else if (action == Action.DELETED) {
    -            logWarning(s"Received delete event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnDelete(pod)
    -          } else {
    -            throw new IllegalStateException(
    -              s"Unknown action that should only be DELETED or ERROR: $action")
    -          }
    -          podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
    -
    -          if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
    -            log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
    -              s"watch received an event of type $action for this executor. The executor may " +
    -              "have failed to start in the first place and never registered with the driver.")
    -          }
    -          disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
    +        case Action.DELETED | Action.ERROR =>
    +          handleFailedPod(action, pod, podName, podIP)
     
             case _ => logDebug(s"Received event of executor pod $podName: " + action)
           }
         }
     
    +    private def handleFailedPod(action: Action, pod: Pod, podName: String, podIP: String) = {
    +      val executorId = getExecutorId(pod)
    +      logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    +      if (podIP != null) {
    +        executorPodsByIPs.remove(podIP)
    +      }
    +
    +      val executorExitReason = if (action == Action.ERROR) {
    +        logWarning(s"Received error event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnError(pod)
    +      } else if (action == Action.DELETED) {
    +        logWarning(s"Received delete event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnDelete(pod)
    +      } else if (action == Action.MODIFIED) {
    +        executorExitReasonOnInitError(pod)
    --- End diff --
    
    I would suggest separating out handling of pod that has failed init-container(s) into its own method. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186789848
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    +            throw new SparkException(errorMessage)
               }
    +          handleFailedPod(action, pod, podName, podIP)
     
    -          val executorExitReason = if (action == Action.ERROR) {
    -            logWarning(s"Received error event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnError(pod)
    -          } else if (action == Action.DELETED) {
    -            logWarning(s"Received delete event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnDelete(pod)
    -          } else {
    -            throw new IllegalStateException(
    -              s"Unknown action that should only be DELETED or ERROR: $action")
    -          }
    -          podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
    -
    -          if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
    -            log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
    -              s"watch received an event of type $action for this executor. The executor may " +
    -              "have failed to start in the first place and never registered with the driver.")
    -          }
    -          disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
    +        case Action.DELETED | Action.ERROR =>
    +          handleFailedPod(action, pod, podName, podIP)
     
             case _ => logDebug(s"Received event of executor pod $podName: " + action)
           }
         }
     
    +    private def handleFailedPod(action: Action, pod: Pod, podName: String, podIP: String) = {
    +      val executorId = getExecutorId(pod)
    +      logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    +      if (podIP != null) {
    +        executorPodsByIPs.remove(podIP)
    +      }
    +
    +      val executorExitReason = if (action == Action.ERROR) {
    +        logWarning(s"Received error event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnError(pod)
    +      } else if (action == Action.DELETED) {
    +        logWarning(s"Received delete event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnDelete(pod)
    +      } else if (action == Action.MODIFIED) {
    +        executorExitReasonOnInitError(pod)
    --- End diff --
    
    What's the full set of error types that we should be tracking here? Are there docs that have a full enumerated list?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186790719
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    --- End diff --
    
    The problem is that executors that fail to start up won't create task failures, meaning that they will not cause the job(s) to stop. Additionally, executors failing to start up will affect every job on the process. So what I think we want to do is when we fail with these initialization errors too many times, is to shut down the Spark context and crash all of the jobs. The alternative is that we stop requesting new executors entirely, but this state isn't desirable because for example you could have your Spark application running with 0 executors, meaning that your application can be stuck for seemingly no reason.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    **[Test build #90224 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90224/testReport)** for PR 21241 at commit [`9df84e8`](https://github.com/apache/spark/commit/9df84e87a36bd6b43b0f74c26ad3bf70a67bb467).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Kubernetes integration test status success
    URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/2853/



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by foxish <gi...@git.apache.org>.
Github user foxish commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186857969
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    --- End diff --
    
    Always re-requesting executors has the potential to overwhelm etcd with failed pods especially for long running jobs. This seems overly conservative but a reasonable place to start - fail entire task upon N failures. With dynamic allocation, it should be possible to check that at least `spark.dynamicAllocation.minExecutors` are alive and make a decision accordingly.  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2929/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    **[Test build #94354 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94354/testReport)** for PR 21241 at commit [`9df84e8`](https://github.com/apache/spark/commit/9df84e87a36bd6b43b0f74c26ad3bf70a67bb467).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by foxish <gi...@git.apache.org>.
Github user foxish commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186879387
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    --- End diff --
    
    The controllers have rate-limiting queues with exponential backoff. In the past, we've had issues (https://github.com/kubernetes/kubernetes/issues/30628, https://github.com/kubernetes/kubernetes/issues/27634 and many more) where a misconfigured queue has caused controllers to spew pods and retry. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90224/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186252598
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    --- End diff --
    
    nit: no need for s"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by foxish <gi...@git.apache.org>.
Github user foxish commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186861330
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    +            throw new SparkException(errorMessage)
               }
    +          handleFailedPod(action, pod, podName, podIP)
     
    -          val executorExitReason = if (action == Action.ERROR) {
    -            logWarning(s"Received error event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnError(pod)
    -          } else if (action == Action.DELETED) {
    -            logWarning(s"Received delete event of executor pod $podName. Reason: " +
    -              pod.getStatus.getReason)
    -            executorExitReasonOnDelete(pod)
    -          } else {
    -            throw new IllegalStateException(
    -              s"Unknown action that should only be DELETED or ERROR: $action")
    -          }
    -          podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
    -
    -          if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
    -            log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
    -              s"watch received an event of type $action for this executor. The executor may " +
    -              "have failed to start in the first place and never registered with the driver.")
    -          }
    -          disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
    +        case Action.DELETED | Action.ERROR =>
    +          handleFailedPod(action, pod, podName, podIP)
     
             case _ => logDebug(s"Received event of executor pod $podName: " + action)
           }
         }
     
    +    private def handleFailedPod(action: Action, pod: Pod, podName: String, podIP: String) = {
    +      val executorId = getExecutorId(pod)
    +      logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    +      if (podIP != null) {
    +        executorPodsByIPs.remove(podIP)
    +      }
    +
    +      val executorExitReason = if (action == Action.ERROR) {
    +        logWarning(s"Received error event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnError(pod)
    +      } else if (action == Action.DELETED) {
    +        logWarning(s"Received delete event of executor pod $podName. Reason: " +
    +          pod.getStatus.getReason)
    +        executorExitReasonOnDelete(pod)
    +      } else if (action == Action.MODIFIED) {
    +        executorExitReasonOnInitError(pod)
    --- End diff --
    
    There isn't a doc, but I'm putting together an initial list. We can grow it as we discover more during operations. Would be good to 0/1 nodes are available: 1 node(s) had disk pressure.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2930/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by erikerlandson <gi...@git.apache.org>.
Github user erikerlandson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186839692
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    --- End diff --
    
    I'm unsure how elaborate we can/should get with this, but a basic test from the back-end side for context seems like "are there any registered executors", which isn't perfect but might be a decent heuristic for whether it's worth trying to continue or just fail the job entirely.  If there are no currently registered executors, and we hit a failure threshold, just failing out altogether seems like a sane response. If there are executors running (satisfying configured minimum?), then continuing might be reasonable since that is graceful degradation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186825312
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    --- End diff --
    
    The Kubernetes client doesn't use any enumerations from the underlying API, it only takes the raw strings in the response body. So if the response gives us those values, we should be fine.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by foxish <gi...@git.apache.org>.
Github user foxish commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186641837
  
    --- Diff: docs/running-on-kubernetes.md ---
    @@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
         This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task 
         parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
     </tr>
    +<tr>
    +  <td><code>spark.kubernetes.executor.maxInitFailures</code></td>
    +  <td>10</td>
    +  <td>
    +    Maximum number of times executors are allowed to fail with an Init:Error state before failing the application. Note that Init:Error failures should not be caused by Spark itself because Spark does not attach init-containers to pods. Init-containers can be attached by the cluster itself. Users should check with their cluster administrator if these kinds of failures to start the executor pod occur frequently.
    --- End diff --
    
    This is very specific and covering exactly one kind of error. I'd like this property to cover all initialization errors.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90222/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by foxish <gi...@git.apache.org>.
Github user foxish commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186642656
  
    --- Diff: docs/running-on-kubernetes.md ---
    @@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
         This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task 
         parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
     </tr>
    +<tr>
    +  <td><code>spark.kubernetes.executor.maxInitFailures</code></td>
    +  <td>10</td>
    +  <td>
    +    Maximum number of times executors are allowed to fail with an Init:Error state before failing the application. Note that Init:Error failures should not be caused by Spark itself because Spark does not attach init-containers to pods. Init-containers can be attached by the cluster itself. Users should check with their cluster administrator if these kinds of failures to start the executor pod occur frequently.
    --- End diff --
    
    Also, I would change the description here, because using init containers for injecting init-containers through mutable webhooks is not something that's all that common. Also should be linked to https://kubernetes.io/docs/admin/extensible-admission-controllers/


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by liyinan926 <gi...@git.apache.org>.
Github user liyinan926 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186780738
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    --- End diff --
    
    Why you need to call this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by liyinan926 <gi...@git.apache.org>.
Github user liyinan926 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186778945
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    --- End diff --
    
    I think it's better to have a method that determines if it's an initialization error.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by liyinan926 <gi...@git.apache.org>.
Github user liyinan926 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186872150
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    --- End diff --
    
    Yes, we need similar logic here. The status values with the form `Init:<...>` are printed forms and won't be returned by the client library.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by liyinan926 <gi...@git.apache.org>.
Github user liyinan926 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186797151
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    --- End diff --
    
    Are `Init:Error` and `Init:CrashLoopBackoff ` valid `PodPhase` values? I don't see them in https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/api/core/v1/types.go#L2215. It appears to me that those are printed form of` PodStatus`. Or the fabric8 client we use return those?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186864312
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    --- End diff --
    
    Seen it using `kubectl`, haven't tried hitting the API directly. When we start seeing these kinds of errors come up again, we can check our pod YAML.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186853242
  
    --- Diff: docs/running-on-kubernetes.md ---
    @@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
         This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task 
         parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
     </tr>
    +<tr>
    +  <td><code>spark.kubernetes.executor.maxInitFailures</code></td>
    +  <td>10</td>
    +  <td>
    +    Maximum number of times executors are allowed to fail with an Init:Error state before failing the application. Note that Init:Error failures should not be caused by Spark itself because Spark does not attach init-containers to pods. Init-containers can be attached by the cluster itself. Users should check with their cluster administrator if these kinds of failures to start the executor pod occur frequently.
    --- End diff --
    
    As per https://github.com/apache/spark/pull/21241#discussion_r186789848 I think it's important to define what that full set of error types is. Do we have a comprehensive list we can follow?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by erikerlandson <gi...@git.apache.org>.
Github user erikerlandson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186846341
  
    --- Diff: docs/running-on-kubernetes.md ---
    @@ -561,6 +561,13 @@ specific to Spark on Kubernetes.
         This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task 
         parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
     </tr>
    +<tr>
    +  <td><code>spark.kubernetes.executor.maxInitFailures</code></td>
    +  <td>10</td>
    +  <td>
    +    Maximum number of times executors are allowed to fail with an Init:Error state before failing the application. Note that Init:Error failures should not be caused by Spark itself because Spark does not attach init-containers to pods. Init-containers can be attached by the cluster itself. Users should check with their cluster administrator if these kinds of failures to start the executor pod occur frequently.
    --- End diff --
    
    I'm in agreement w/ @foxish about designing for wider categories of error, which can be future-proofed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94354/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by erikerlandson <gi...@git.apache.org>.
Github user erikerlandson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186859389
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    --- End diff --
    
    Leaning on configured minimum executors could be construed as a way to let the user express application-dependent throughput minimums. If the use case is terabyte-scale, then setting minimum executors appropriately would address that.  A %-threshold is also reasonable, it just adds a new knob.
    
    Requesting new executors indefinitely seems plausible, as long as failed pods don't accumulate. Having a submission hang indefinitely while kube churns may be confusing behavior, at least if users are not familiar with kube and hoping to treat it transparently. Maybe mesos backend policies could provide an analogy?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21241: [SPARK-24135][K8s] Resilience to init-container errors o...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21241
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21241#discussion_r186876513
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---
    @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend(
         override def eventReceived(action: Action, pod: Pod): Unit = {
           val podName = pod.getMetadata.getName
           val podIP = pod.getStatus.getPodIP
    -
    +      val podPhase = pod.getStatus.getPhase
           action match {
    -        case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
    +        case Action.MODIFIED if (podPhase == "Running"
                 && pod.getMetadata.getDeletionTimestamp == null) =>
               val clusterNodeName = pod.getSpec.getNodeName
               logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
               executorPodsByIPs.put(podIP, pod)
     
    -        case Action.DELETED | Action.ERROR =>
    +        case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff")
    +          && pod.getMetadata.getDeletionTimestamp == null =>
               val executorId = getExecutorId(pod)
    -          logDebug(s"Executor pod $podName at IP $podIP was at $action.")
    -          if (podIP != null) {
    -            executorPodsByIPs.remove(podIP)
    +          failedInitExecutors.add(executorId)
    +          if (failedInitExecutors.size >= executorMaxInitErrors) {
    +            val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" +
    +              s" executors failed to start. The maximum number of allowed startup failures is" +
    +              s" $executorMaxInitErrors. Please contact your cluster administrator or increase" +
    +              s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}."
    +            logError(errorMessage)
    +            KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread()
    --- End diff --
    
    @foxish out of curiosity how does replication controller and replica set, and similar controllers, handle the same problem? Presumably there would also be heavy etcd load in the case pods always fail to start up and the controller keeps trying to do so.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org