You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/02/21 11:36:35 UTC

[linkis] branch dev-1.3.2 updated: [Bugfix] The spark jar job status is correctly obtained (#4249)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 9887de3e7 [Bugfix] The spark jar job status is correctly obtained (#4249)
9887de3e7 is described below

commit 9887de3e75f3eb483c02ca3b5d91855b1cf48c16
Author: 人生有如两个橘子 <15...@163.com>
AuthorDate: Tue Feb 21 19:36:31 2023 +0800

    [Bugfix] The spark jar job status is correctly obtained (#4249)
    
    * [bugfix] The spark jar job status is not correctly obtained
---
 .../deployment/ClusterDescriptorAdapter.java       | 16 ++---
 .../YarnApplicationClusterDescriptorAdapter.java   | 62 ++++++++-----------
 .../spark/executor/SparkOnceExecutor.scala         | 71 ++++++++--------------
 .../spark/executor/SparkSubmitOnceExecutor.scala   |  4 ++
 .../spark/launcher/CustomSparkSubmitLauncher.scala | 34 +++++++++++
 5 files changed, 93 insertions(+), 94 deletions(-)

diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
index 4607f2e4b..16b7613cf 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
@@ -19,8 +19,8 @@ package org.apache.linkis.engineplugin.spark.client.deployment;
 
 import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
 
+import org.apache.spark.launcher.CustomSparkSubmitLauncher;
 import org.apache.spark.launcher.SparkAppHandle;
-import org.apache.spark.launcher.SparkLauncher;
 
 import java.io.Closeable;
 
@@ -32,7 +32,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
 
   protected final ExecutionContext executionContext;
   protected String applicationId;
-  protected SparkLauncher sparkLauncher;
+  protected CustomSparkSubmitLauncher sparkLauncher;
   protected SparkAppHandle sparkAppHandle;
   protected SparkAppHandle.State jobState;
 
@@ -49,14 +49,8 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
     return jobState;
   }
 
-  /** Cancel the spark job. */
-  public void cancelJob() {
-    if (sparkAppHandle != null) {
-      logger.info("Start to cancel job {}.", sparkAppHandle.getAppId());
-      this.stopJob();
-    } else {
-      logger.warn("Cancel job: sparkAppHandle is null");
-    }
+  public boolean isDisposed() {
+    return sparkLauncher.isDisposed();
   }
 
   @Override
@@ -78,7 +72,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
     if (sparkAppHandle == null) {
       return;
     }
-    if (sparkAppHandle.getState().isFinal()) {
+    if (isDisposed()) {
       logger.info("Job has finished, stop action return.");
       return;
     }
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
index 1cdfb374e..21f2885b4 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
@@ -21,13 +21,13 @@ import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
 import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.launcher.CustomSparkSubmitLauncher;
 import org.apache.spark.launcher.SparkAppHandle;
 import org.apache.spark.launcher.SparkLauncher;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 
 public class YarnApplicationClusterDescriptorAdapter extends ClusterDescriptorAdapter {
 
@@ -36,39 +36,10 @@ public class YarnApplicationClusterDescriptorAdapter extends ClusterDescriptorAd
   }
 
   public void deployCluster(String mainClass, String args, Map<String, String> confMap)
-      throws IOException, InterruptedException {
+      throws IOException {
     SparkConfig sparkConfig = executionContext.getSparkConfig();
 
-    CountDownLatch countDownLatch = new CountDownLatch(1);
-    SparkAppHandle.Listener listener =
-        new SparkAppHandle.Listener() {
-          @Override
-          public void stateChanged(SparkAppHandle sparkAppHandle) {
-            jobState = sparkAppHandle.getState();
-            if (sparkAppHandle.getAppId() != null) {
-              countDownLatch.countDown();
-              applicationId = sparkAppHandle.getAppId();
-              logger.info("{} stateChanged: {}", applicationId, jobState.toString());
-            } else {
-              if (jobState.isFinal()) {
-                countDownLatch.countDown();
-              }
-              logger.info("stateChanged: {}", jobState.toString());
-            }
-          }
-
-          @Override
-          public void infoChanged(SparkAppHandle sparkAppHandle) {
-            jobState = sparkAppHandle.getState();
-            if (sparkAppHandle.getAppId() != null) {
-              logger.info("{} infoChanged: {}", sparkAppHandle.getAppId(), jobState.toString());
-            } else {
-              logger.info("infoChanged: {}", jobState.toString());
-            }
-          }
-        };
-
-    sparkLauncher = new SparkLauncher();
+    sparkLauncher = new CustomSparkSubmitLauncher();
     // region set args
     sparkLauncher
         .setJavaHome(sparkConfig.getJavaHome())
@@ -105,9 +76,25 @@ public class YarnApplicationClusterDescriptorAdapter extends ClusterDescriptorAd
         .filter(StringUtils::isNotBlank)
         .forEach(arg -> sparkLauncher.addAppArgs(arg));
     // sparkLauncher.addAppArgs(args);
-    // endregion
-    sparkAppHandle = sparkLauncher.startApplication(listener);
-    countDownLatch.await();
+    sparkAppHandle =
+        sparkLauncher.startApplication(
+            new SparkAppHandle.Listener() {
+              @Override
+              public void stateChanged(SparkAppHandle sparkAppHandle) {
+                jobState = sparkAppHandle.getState();
+                // print log when state change
+                if (sparkAppHandle.getAppId() != null) {
+                  logger.info(
+                      "{} stateChanged: {}", sparkAppHandle.getAppId(), jobState.toString());
+                } else {
+                  logger.info("stateChanged: {}", jobState.toString());
+                }
+              }
+
+              @Override
+              public void infoChanged(SparkAppHandle sparkAppHandle) {}
+            });
+    sparkLauncher.setSparkAppHandle(sparkAppHandle);
   }
 
   private void addSparkArg(SparkLauncher sparkLauncher, String key, String value) {
@@ -117,6 +104,9 @@ public class YarnApplicationClusterDescriptorAdapter extends ClusterDescriptorAd
   }
 
   public boolean initJobId() {
-    return null != getApplicationId();
+    this.applicationId = sparkAppHandle.getAppId();
+    // When the job is not finished, the appId is monitored; otherwise, the status is
+    // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待)
+    return null != getApplicationId() || (jobState != null && jobState.isFinal());
   }
 }
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
index faa90b03a..73173bb53 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
@@ -26,10 +26,7 @@ import org.apache.linkis.engineplugin.spark.client.deployment.{
   ClusterDescriptorAdapter,
   ClusterDescriptorAdapterFactory
 }
-import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
-  SPARK_ONCE_APP_STATUS_FETCH_FAILED_MAX,
-  SPARK_ONCE_APP_STATUS_FETCH_INTERVAL
-}
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.SPARK_ONCE_APP_STATUS_FETCH_INTERVAL
 import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
 import org.apache.linkis.engineplugin.spark.exception.ExecutorInitException
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
@@ -62,14 +59,6 @@ trait SparkOnceExecutor[T <: ClusterDescriptorAdapter]
       case (k, _) => k -> null
     }.toMap
     doSubmit(onceExecutorExecutionContext, options)
-    if (isCompleted) return
-    if (null == clusterDescriptorAdapter.getApplicationId)
-      throw new ExecutorInitException(
-        SparkErrorCodeSummary.YARN_APPLICATION_START_FAILED.getErrorCode,
-        SparkErrorCodeSummary.YARN_APPLICATION_START_FAILED.getErrorDesc
-      )
-    setApplicationId(clusterDescriptorAdapter.getApplicationId)
-    logger.info(s"Application is started, applicationId: $getApplicationId.")
   }
 
   protected def isCompleted: Boolean = isClosed || NodeStatus.isCompleted(getStatus)
@@ -96,61 +85,49 @@ trait SparkOnceExecutor[T <: ClusterDescriptorAdapter]
   }
 
   override protected def waitToRunning(): Unit = {
-    var waitingToFinished = false
-    if (!isCompleted)
+    if (!isCompleted) {
+      logger.info("start spark monitor thread")
       daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(
         new Runnable {
           private var lastStatus: SparkAppHandle.State = _
           private var lastPrintTime = 0L
           private val printInterval =
             math.max(SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong, 5 * 60 * 1000)
-          private var fetchJobStatusFailedNum = 0
-
-          override def run(): Unit = if (!isCompleted && !waitingToFinished) {
-            val jobState = Utils.tryCatch(clusterDescriptorAdapter.getJobState) { t =>
-              val maxFailedNum = SPARK_ONCE_APP_STATUS_FETCH_FAILED_MAX.getValue
-              if (fetchJobStatusFailedNum >= maxFailedNum) {
-                val errMsg =
-                  s"Fetch job status has failed max $maxFailedNum times, now stop this SparkEngineConn."
-                logger.error(errMsg, t)
-                tryFailed()
-                close()
-              } else {
-                fetchJobStatusFailedNum += 1
-                logger.error(s"Fetch job status failed! retried ++$fetchJobStatusFailedNum...", t)
-              }
-              return
-            }
 
-            fetchJobStatusFailedNum = 0
+          override def run(): Unit = {
+            val jobState = clusterDescriptorAdapter.getJobState
             if (
-                jobState != lastStatus || System.currentTimeMillis - lastPrintTime >= printInterval
+                (jobState != null && jobState != lastStatus) || System.currentTimeMillis - lastPrintTime >= printInterval
             ) {
               logger.info(s"The jobState of $getApplicationId is $jobState.")
               lastPrintTime = System.currentTimeMillis
             }
+
             lastStatus = jobState
-            if (SparkAppHandle.State.FINISHED == lastStatus) {
-              waitingToFinished = true
-              logger.info("Job has finished, waiting for final status.")
-              Thread.sleep(5000)
-              logger.info(s"Job's final status ${clusterDescriptorAdapter.getJobState}.")
-            }
-            clusterDescriptorAdapter.getJobState match {
-              case SparkAppHandle.State.FAILED | SparkAppHandle.State.KILLED |
-                  SparkAppHandle.State.LOST =>
-                tryFailed()
-              case SparkAppHandle.State.FINISHED =>
-                trySucceed()
-              case _ =>
+            if (clusterDescriptorAdapter.isDisposed) {
+              // get final state again
+              lastStatus = clusterDescriptorAdapter.getJobState
+              logger.info(s"spark process is not alive, state ${lastStatus}")
+              lastStatus match {
+                case SparkAppHandle.State.FINISHED =>
+                  trySucceed()
+                case SparkAppHandle.State.FAILED | SparkAppHandle.State.KILLED |
+                    SparkAppHandle.State.LOST =>
+                  tryFailed()
+                case _ =>
+                  tryFailed()
+              }
             }
-            waitingToFinished = false
           }
         },
         SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong,
         SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong,
         TimeUnit.MILLISECONDS
       )
+    } else {
+      close()
+      logger.info("ready to start spark monitor thread, but job is final, so execute close")
+    }
   }
 
   override def supportCallBackLogs(): Boolean = true
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
index 8cec44d00..9fb68ed4c 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
@@ -74,7 +74,11 @@ class SparkSubmitOnceExecutor(
   }
 
   override protected def waitToRunning(): Unit = {
+    // Wait until the task return applicationId (等待返回applicationId)
     Utils.waitUntil(() => clusterDescriptorAdapter.initJobId(), Duration.Inf)
+    // Synchronize applicationId to EC SparkOnceExecutor to facilitate user operations,
+    // such as obtaining progress and killing jobs(将applicationId同步给EC执行器,方便用户操作,如获取进度,kill任务等)
+    setApplicationId(clusterDescriptorAdapter.getApplicationId)
     super.waitToRunning()
   }
 
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/launcher/CustomSparkSubmitLauncher.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/launcher/CustomSparkSubmitLauncher.scala
new file mode 100644
index 000000000..951a1eb96
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/launcher/CustomSparkSubmitLauncher.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+class CustomSparkSubmitLauncher extends SparkLauncher {
+
+  var sparkAppHandle: SparkAppHandle = _
+
+  def setSparkAppHandle(sparkAppHandle: SparkAppHandle): Unit = this.sparkAppHandle = sparkAppHandle
+
+  def isDisposed: Boolean = {
+    if (sparkAppHandle != null) {
+      sparkAppHandle.asInstanceOf[AbstractAppHandle].isDisposed
+    } else {
+      false
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org