You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/02/21 11:36:35 UTC
[linkis] branch dev-1.3.2 updated: [Bugfix] The spark jar job status is correctly obtained (#4249)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 9887de3e7 [Bugfix] The spark jar job status is correctly obtained (#4249)
9887de3e7 is described below
commit 9887de3e75f3eb483c02ca3b5d91855b1cf48c16
Author: 人生有如两个橘子 <15...@163.com>
AuthorDate: Tue Feb 21 19:36:31 2023 +0800
[Bugfix] The spark jar job status is correctly obtained (#4249)
* [bugfix] The spark jar job status is not correctly obtained
---
.../deployment/ClusterDescriptorAdapter.java | 16 ++---
.../YarnApplicationClusterDescriptorAdapter.java | 62 ++++++++-----------
.../spark/executor/SparkOnceExecutor.scala | 71 ++++++++--------------
.../spark/executor/SparkSubmitOnceExecutor.scala | 4 ++
.../spark/launcher/CustomSparkSubmitLauncher.scala | 34 +++++++++++
5 files changed, 93 insertions(+), 94 deletions(-)
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
index 4607f2e4b..16b7613cf 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
@@ -19,8 +19,8 @@ package org.apache.linkis.engineplugin.spark.client.deployment;
import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import org.apache.spark.launcher.CustomSparkSubmitLauncher;
import org.apache.spark.launcher.SparkAppHandle;
-import org.apache.spark.launcher.SparkLauncher;
import java.io.Closeable;
@@ -32,7 +32,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
protected final ExecutionContext executionContext;
protected String applicationId;
- protected SparkLauncher sparkLauncher;
+ protected CustomSparkSubmitLauncher sparkLauncher;
protected SparkAppHandle sparkAppHandle;
protected SparkAppHandle.State jobState;
@@ -49,14 +49,8 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
return jobState;
}
- /** Cancel the spark job. */
- public void cancelJob() {
- if (sparkAppHandle != null) {
- logger.info("Start to cancel job {}.", sparkAppHandle.getAppId());
- this.stopJob();
- } else {
- logger.warn("Cancel job: sparkAppHandle is null");
- }
+ public boolean isDisposed() {
+ return sparkLauncher.isDisposed();
}
@Override
@@ -78,7 +72,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
if (sparkAppHandle == null) {
return;
}
- if (sparkAppHandle.getState().isFinal()) {
+ if (isDisposed()) {
logger.info("Job has finished, stop action return.");
return;
}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
index 1cdfb374e..21f2885b4 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
@@ -21,13 +21,13 @@ import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.launcher.CustomSparkSubmitLauncher;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
public class YarnApplicationClusterDescriptorAdapter extends ClusterDescriptorAdapter {
@@ -36,39 +36,10 @@ public class YarnApplicationClusterDescriptorAdapter extends ClusterDescriptorAd
}
public void deployCluster(String mainClass, String args, Map<String, String> confMap)
- throws IOException, InterruptedException {
+ throws IOException {
SparkConfig sparkConfig = executionContext.getSparkConfig();
- CountDownLatch countDownLatch = new CountDownLatch(1);
- SparkAppHandle.Listener listener =
- new SparkAppHandle.Listener() {
- @Override
- public void stateChanged(SparkAppHandle sparkAppHandle) {
- jobState = sparkAppHandle.getState();
- if (sparkAppHandle.getAppId() != null) {
- countDownLatch.countDown();
- applicationId = sparkAppHandle.getAppId();
- logger.info("{} stateChanged: {}", applicationId, jobState.toString());
- } else {
- if (jobState.isFinal()) {
- countDownLatch.countDown();
- }
- logger.info("stateChanged: {}", jobState.toString());
- }
- }
-
- @Override
- public void infoChanged(SparkAppHandle sparkAppHandle) {
- jobState = sparkAppHandle.getState();
- if (sparkAppHandle.getAppId() != null) {
- logger.info("{} infoChanged: {}", sparkAppHandle.getAppId(), jobState.toString());
- } else {
- logger.info("infoChanged: {}", jobState.toString());
- }
- }
- };
-
- sparkLauncher = new SparkLauncher();
+ sparkLauncher = new CustomSparkSubmitLauncher();
// region set args
sparkLauncher
.setJavaHome(sparkConfig.getJavaHome())
@@ -105,9 +76,25 @@ public class YarnApplicationClusterDescriptorAdapter extends ClusterDescriptorAd
.filter(StringUtils::isNotBlank)
.forEach(arg -> sparkLauncher.addAppArgs(arg));
// sparkLauncher.addAppArgs(args);
- // endregion
- sparkAppHandle = sparkLauncher.startApplication(listener);
- countDownLatch.await();
+ sparkAppHandle =
+ sparkLauncher.startApplication(
+ new SparkAppHandle.Listener() {
+ @Override
+ public void stateChanged(SparkAppHandle sparkAppHandle) {
+ jobState = sparkAppHandle.getState();
+ // print log when state change
+ if (sparkAppHandle.getAppId() != null) {
+ logger.info(
+ "{} stateChanged: {}", sparkAppHandle.getAppId(), jobState.toString());
+ } else {
+ logger.info("stateChanged: {}", jobState.toString());
+ }
+ }
+
+ @Override
+ public void infoChanged(SparkAppHandle sparkAppHandle) {}
+ });
+ sparkLauncher.setSparkAppHandle(sparkAppHandle);
}
private void addSparkArg(SparkLauncher sparkLauncher, String key, String value) {
@@ -117,6 +104,9 @@ public class YarnApplicationClusterDescriptorAdapter extends ClusterDescriptorAd
}
public boolean initJobId() {
- return null != getApplicationId();
+ this.applicationId = sparkAppHandle.getAppId();
+ // When the job is not finished, the appId is monitored; otherwise, the status is
+ // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待)
+ return null != getApplicationId() || (jobState != null && jobState.isFinal());
}
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
index faa90b03a..73173bb53 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
@@ -26,10 +26,7 @@ import org.apache.linkis.engineplugin.spark.client.deployment.{
ClusterDescriptorAdapter,
ClusterDescriptorAdapterFactory
}
-import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
- SPARK_ONCE_APP_STATUS_FETCH_FAILED_MAX,
- SPARK_ONCE_APP_STATUS_FETCH_INTERVAL
-}
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.SPARK_ONCE_APP_STATUS_FETCH_INTERVAL
import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
import org.apache.linkis.engineplugin.spark.exception.ExecutorInitException
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
@@ -62,14 +59,6 @@ trait SparkOnceExecutor[T <: ClusterDescriptorAdapter]
case (k, _) => k -> null
}.toMap
doSubmit(onceExecutorExecutionContext, options)
- if (isCompleted) return
- if (null == clusterDescriptorAdapter.getApplicationId)
- throw new ExecutorInitException(
- SparkErrorCodeSummary.YARN_APPLICATION_START_FAILED.getErrorCode,
- SparkErrorCodeSummary.YARN_APPLICATION_START_FAILED.getErrorDesc
- )
- setApplicationId(clusterDescriptorAdapter.getApplicationId)
- logger.info(s"Application is started, applicationId: $getApplicationId.")
}
protected def isCompleted: Boolean = isClosed || NodeStatus.isCompleted(getStatus)
@@ -96,61 +85,49 @@ trait SparkOnceExecutor[T <: ClusterDescriptorAdapter]
}
override protected def waitToRunning(): Unit = {
- var waitingToFinished = false
- if (!isCompleted)
+ if (!isCompleted) {
+ logger.info("start spark monitor thread")
daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable {
private var lastStatus: SparkAppHandle.State = _
private var lastPrintTime = 0L
private val printInterval =
math.max(SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong, 5 * 60 * 1000)
- private var fetchJobStatusFailedNum = 0
-
- override def run(): Unit = if (!isCompleted && !waitingToFinished) {
- val jobState = Utils.tryCatch(clusterDescriptorAdapter.getJobState) { t =>
- val maxFailedNum = SPARK_ONCE_APP_STATUS_FETCH_FAILED_MAX.getValue
- if (fetchJobStatusFailedNum >= maxFailedNum) {
- val errMsg =
- s"Fetch job status has failed max $maxFailedNum times, now stop this SparkEngineConn."
- logger.error(errMsg, t)
- tryFailed()
- close()
- } else {
- fetchJobStatusFailedNum += 1
- logger.error(s"Fetch job status failed! retried ++$fetchJobStatusFailedNum...", t)
- }
- return
- }
- fetchJobStatusFailedNum = 0
+ override def run(): Unit = {
+ val jobState = clusterDescriptorAdapter.getJobState
if (
- jobState != lastStatus || System.currentTimeMillis - lastPrintTime >= printInterval
+ (jobState != null && jobState != lastStatus) || System.currentTimeMillis - lastPrintTime >= printInterval
) {
logger.info(s"The jobState of $getApplicationId is $jobState.")
lastPrintTime = System.currentTimeMillis
}
+
lastStatus = jobState
- if (SparkAppHandle.State.FINISHED == lastStatus) {
- waitingToFinished = true
- logger.info("Job has finished, waiting for final status.")
- Thread.sleep(5000)
- logger.info(s"Job's final status ${clusterDescriptorAdapter.getJobState}.")
- }
- clusterDescriptorAdapter.getJobState match {
- case SparkAppHandle.State.FAILED | SparkAppHandle.State.KILLED |
- SparkAppHandle.State.LOST =>
- tryFailed()
- case SparkAppHandle.State.FINISHED =>
- trySucceed()
- case _ =>
+ if (clusterDescriptorAdapter.isDisposed) {
+ // get final state again
+ lastStatus = clusterDescriptorAdapter.getJobState
+ logger.info(s"spark process is not alive, state ${lastStatus}")
+ lastStatus match {
+ case SparkAppHandle.State.FINISHED =>
+ trySucceed()
+ case SparkAppHandle.State.FAILED | SparkAppHandle.State.KILLED |
+ SparkAppHandle.State.LOST =>
+ tryFailed()
+ case _ =>
+ tryFailed()
+ }
}
- waitingToFinished = false
}
},
SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong,
SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong,
TimeUnit.MILLISECONDS
)
+ } else {
+ close()
+ logger.info("ready to start spark monitor thread, but job is final, so execute close")
+ }
}
override def supportCallBackLogs(): Boolean = true
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
index 8cec44d00..9fb68ed4c 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
@@ -74,7 +74,11 @@ class SparkSubmitOnceExecutor(
}
override protected def waitToRunning(): Unit = {
+ // Wait until the task return applicationId (等待返回applicationId)
Utils.waitUntil(() => clusterDescriptorAdapter.initJobId(), Duration.Inf)
+ // Synchronize applicationId to EC SparkOnceExecutor to facilitate user operations,
+ // such as obtaining progress and killing jobs(将applicationId同步给EC执行器,方便用户操作,如获取进度,kill任务等)
+ setApplicationId(clusterDescriptorAdapter.getApplicationId)
super.waitToRunning()
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/launcher/CustomSparkSubmitLauncher.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/launcher/CustomSparkSubmitLauncher.scala
new file mode 100644
index 000000000..951a1eb96
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/launcher/CustomSparkSubmitLauncher.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+class CustomSparkSubmitLauncher extends SparkLauncher {
+
+ var sparkAppHandle: SparkAppHandle = _
+
+ def setSparkAppHandle(sparkAppHandle: SparkAppHandle): Unit = this.sparkAppHandle = sparkAppHandle
+
+ def isDisposed: Boolean = {
+ if (sparkAppHandle != null) {
+ sparkAppHandle.asInstanceOf[AbstractAppHandle].isDisposed
+ } else {
+ false
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org