You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/07/12 05:47:58 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-5023 Support
cluster deployMode for Spark App which master is standalone
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new b2016f9 KYLIN-5023 Support cluster deployMode for Spark App which master is standalone
b2016f9 is described below
commit b2016f915db454e64b7018047b3df5c707d7946c
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Sat Jul 3 15:58:23 2021 +0800
KYLIN-5023 Support cluster deployMode for Spark App which master is standalone
---
.../org/apache/kylin/common/KylinConfigBase.java | 3 +
.../kylin/job/execution/AbstractExecutable.java | 6 +-
.../apache/spark/sql/common/LocalMetadata.scala | 3 -
.../kylin/engine/spark/job/NSparkExecutable.java | 16 +-
.../kylin/engine/spark/utils/MetaDumpUtil.java | 2 +-
.../kylin/engine/spark/utils/RestService.java | 93 ++++++++++++
.../spark/deploy/SparkApplicationClient.scala | 56 +++++++
.../apache/spark/deploy/StandaloneAppClient.scala | 116 ++++++++++++++
.../resources/response/application-detail-246.html | 167 +++++++++++++++++++++
.../resources/response/standalone-master-246.json | 94 ++++++++++++
.../spark/deploy/StandaloneAppClientTest.scala | 34 +++++
11 files changed, 582 insertions(+), 8 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0d2d11e..bccda65 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1541,6 +1541,9 @@ public abstract class KylinConfigBase implements Serializable {
}
+ public String getSparkStandaloneMasterWebUI() {
+ return getOptional("kylin.engine.spark.standalone.master.httpUrl", "");
+ }
public String getKylinJobJarPath() {
final String jobJar = getOptional(KYLIN_ENGINE_MR_JOB_JAR);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 4c17d67..1f14508 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -699,9 +699,9 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
return 1024;
}
- //Default driver memory base is 1024M
- //Adujst driver memory by cuboid number with stratogy[2 -> 20 -> 100]
- //If cuboid number is 10, then driver.memory=2*1024=2048M
+ // Default driver memory base is 1024M
+ // Adjust driver memory by cuboid number with stratogy[2 -> 20 -> 100]
+ // If cuboid number is 10, then driver.memory=2*1024=2048M
public static Integer computeDriverMemory(Integer cuboidNum) {
KylinConfig config = KylinConfig.getInstanceFromEnv();
int[] driverMemoryStrategy = config.getSparkEngineDriverMemoryStrategy();
diff --git a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
index ab6b182..d7c7f5d 100644
--- a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
+++ b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
@@ -18,9 +18,6 @@
package org.apache.spark.sql.common
-import java.io.{File, IOException}
-
-import org.apache.commons.io.FileUtils
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.{LocalFileMetadataTestCase, TempMetadataBuilder}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index efbe417..6859e65 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -63,6 +63,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.spark.deploy.SparkApplicationClient;
import org.apache.spark.utils.SparkVersionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +84,7 @@ public class NSparkExecutable extends AbstractExecutable {
private static final String APP_JAR_NAME = "__app__.jar";
private volatile boolean isYarnCluster = false;
+ private volatile boolean isStandaloneCluster = false;
protected void setSparkSubmitClassName(String className) {
this.setParam(MetadataConstants.P_CLASS_NAME, className);
@@ -165,6 +167,9 @@ public class NSparkExecutable extends AbstractExecutable {
}
}
+ /**
+ * Dump metadata from Kylin Metadata and persist to HDFS(getDistMetaUrl) for Spark Application
+ */
void attachMetadataAndKylinProps(KylinConfig config) throws IOException {
// The way of Updating metadata is CopyOnWrite. So it is safe to use Reference in the value.
Set<String> dumpList = getMetadataDumpList(config);
@@ -272,6 +277,9 @@ public class NSparkExecutable extends AbstractExecutable {
CliCommandExecutor exec = new CliCommandExecutor();
exec.execute(cmd, patternedLogger, jobId);
+ if (isStandaloneCluster) {
+ SparkApplicationClient.awaitAndCheckAppState(SparkApplicationClient.STANDALONE_CLUSTER(), jobId);
+ }
updateMetaAfterOperation(config);
//Add metrics information to execute result for JobMetricsFacade
getManager().addJobInfo(getId(), getJobMetricsInfo(config));
@@ -297,6 +305,12 @@ public class NSparkExecutable extends AbstractExecutable {
&& "cluster".equals(sparkConfigOverride.get(DEPLOY_MODE)) && !(this instanceof NSparkLocalStep)) {
this.isYarnCluster = true;
}
+
+ if (sparkConfigOverride.get(SPARK_MASTER).toLowerCase(Locale.ROOT).startsWith("spark")
+ && "cluster".equals(sparkConfigOverride.get(DEPLOY_MODE)) && !(this instanceof NSparkLocalStep)) {
+ this.isStandaloneCluster = true;
+ }
+
if (!sparkConfigOverride.containsKey("spark.driver.memory")) {
sparkConfigOverride.put("spark.driver.memory", computeStepDriverMemory() + "m");
}
@@ -434,7 +448,7 @@ public class NSparkExecutable extends AbstractExecutable {
private ExecuteResult runLocalMode(String appArgs, KylinConfig config) {
try {
Class<? extends Object> appClz = ClassUtil.forName(getSparkSubmitClassName(), Object.class);
- appClz.getMethod("main", String[].class).invoke(null, (Object) new String[] { appArgs });
+ appClz.getMethod("main", String[].class).invoke(null, (Object) new String[]{appArgs});
updateMetaAfterOperation(config);
//Add metrics information to execute result for JobMetricsFacade
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
index c2ef2c9..9121bdb 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
@@ -80,7 +80,7 @@ public class MetaDumpUtil {
}
KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
- //upload metadata
+ // upload metadata
new ResourceTool().copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
}
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/RestService.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/RestService.java
new file mode 100644
index 0000000..71f76cc
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/RestService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.utils;
+
+import java.io.IOException;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RestService {
+ private static final Logger logger = LoggerFactory.getLogger(RestService.class);
+
+ private int connectionTimeout;
+ private int readTimeout;
+
+ public RestService(int connectionTimeout, int readTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ this.readTimeout = readTimeout;
+
+ }
+
+ public String getRequest(String url) throws IOException {
+ return getRequest(url, connectionTimeout, readTimeout);
+ }
+
+
+ public String getRequest(String url, int connTimeout, int readTimeout) throws IOException {
+ HttpGet request = new HttpGet(url);
+ return execRequest(request, connTimeout, readTimeout);
+ }
+
+
+ private HttpClient getHttpClient(int connectionTimeout, int readTimeout) {
+ final HttpParams httpParams = new BasicHttpParams();
+ HttpConnectionParams.setSoTimeout(httpParams, readTimeout);
+ HttpConnectionParams.setConnectionTimeout(httpParams, connectionTimeout);
+
+ return new DefaultHttpClient(httpParams);
+ }
+
+ public String execRequest(HttpRequestBase request, int connectionTimeout, int readTimeout) throws IOException {
+ HttpClient httpClient = getHttpClient(connectionTimeout, readTimeout);
+ try {
+ HttpResponse response = httpClient.execute(request);
+ String msg = EntityUtils.toString(response.getEntity());
+ int code = response.getStatusLine().getStatusCode();
+ if (logger.isTraceEnabled()) {
+ String displayMessage;
+ if (msg.length() > 500) {
+ displayMessage = msg.substring(0, 500);
+ } else {
+ displayMessage = msg;
+ }
+ logger.trace("Send request: {}. And receive response[{}] which length is {}, and content is {}.", code,
+ request.getRequestLine(), msg.length(), displayMessage);
+ }
+ if (code != 200)
+ throw new IOException("Invalid http response " + code + " when send request: "
+ + request.getURI().toString() + "\n" + msg);
+ return msg;
+ } catch (IOException e) {
+ logger.error("error when send http request:" + request.getURI().toString(), e);
+ throw e;
+ } finally {
+ request.releaseConnection();
+ }
+ }
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
new file mode 100644
index 0000000..5b2caf4
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy
+
+import org.apache.spark.internal.Logging
+
+/**
+ * A client which ask application's state from Yarn's resource manager of Standalone master.
+ */
+object SparkApplicationClient extends Logging {
+
+ // Copied from org.apache.spark.deploy.master.ApplicationState
+ val finalStates = Set("FINISHED", "FAILED", "KILLED", "UNKNOWN")
+
+ // master is spark standalone and deployMode is cluster
+ val STANDALONE_CLUSTER: String = "standalone_cluster"
+
+ /**
+ * Report the state of an application which name is stepId until it has exited,
+ * either successfully or due to some failure, then return app state.
+ *
+ * Current only for spark job which its master is spark standalone and deployMode is cluster,
+ * because standalone client lack property "spark.standalone.submit.waitAppCompletion".
+ */
+ def awaitAndCheckAppState(sparkMaster: String, stepId: String): String = {
+ logInfo(s"AwaitAndCheckAppState $stepId{} ...")
+ sparkMaster match {
+ case STANDALONE_CLUSTER =>
+ var appState = StandaloneAppClient.getAppState(stepId)
+ while (true) {
+ logInfo(s"$stepId state is $appState .")
+ if (!finalStates.contains(appState)) {
+ Thread.sleep(10000)
+ }
+ appState = StandaloneAppClient.getAppState(stepId)
+ }
+ appState
+ case m => throw new UnsupportedOperationException("waitAndCheckAppState " + m)
+ }
+ }
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/StandaloneAppClient.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/StandaloneAppClient.scala
new file mode 100644
index 0000000..15f456a
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/StandaloneAppClient.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.engine.spark.utils.RestService
+import org.apache.spark.internal.Logging
+
+import java.io.IOException
+import scala.collection.mutable
+import scala.util.parsing.json.JSON._
+
+
+object StandaloneAppClient extends Logging {
+
+ private val JOB_STEP_PREFIX = "job_step_"
+
+ // appId -> (appName, state, starttime)
+ private val cachedKylinJobMap: mutable.Map[String, (String, String, Long)] = new mutable.LinkedHashMap[String, (String, String, Long)]()
+
+ private var jobInfoUpdateTime = System.currentTimeMillis()
+ private val cacheTtl = 3600 * 1000 * 24 * 5
+ private val cacheMaxSize = 30000
+
+ // private val masterUrlHtml: String = KylinConfig.getInstanceFromEnv.getSparkStandaloneMasterWebUI + "/app/?appId="
+ private val masterUrlJson: String = KylinConfig.getInstanceFromEnv.getSparkStandaloneMasterWebUI + "/json"
+
+ private val restService: RestService = new RestService(10000, 10000)
+
+
+ /**
+ * @see org.apache.spark.deploy.master.ApplicationInfo
+ * @return Kylin's Build Job 's ApplicationInfo, update every 5 minutes
+ */
+ def getRunningJobs: mutable.Map[String, (String, String, Long)] = cachedKylinJobMap.synchronized {
+ val currMills = System.currentTimeMillis
+ if (cachedKylinJobMap.isEmpty || currMills - jobInfoUpdateTime >= 10000) {
+ logDebug("Updating app status ...")
+ try {
+ val realResp = restService.getRequest(masterUrlJson)
+ parseApplicationState(realResp)
+ } catch {
+ case ioe: IOException => logError("Can not connect to standalone master service.", ioe)
+ case e: Exception => logError("Error .", e)
+ }
+ jobInfoUpdateTime = currMills
+ }
+ cachedKylinJobMap
+ }
+
+ def getAppState(stepId: String): String = {
+ getRunningJobs
+
+ val doNothing: PartialFunction[(String, String, Long), (String, String, Long)] = {
+ case x => x
+ }
+ val res: Iterable[(String, String, Long)] = cachedKylinJobMap.values.filter(app => app._1.contains(stepId)).collect(doNothing)
+
+ res.size match {
+ case 0 => "SUBMITTED"
+ case 1 => res.head._2
+ case _ =>
+ // find the recent submitted application
+ res.maxBy(x => x._3)._2
+ }
+ }
+
+
+ def parseApplicationState(responseStr: String): Unit = {
+ val curr = System.currentTimeMillis()
+
+ var respJson = Map.empty[String, Any]
+ val tree = parseFull(responseStr)
+ respJson = tree match {
+ case Some(map: Map[String, Any]) => map
+ }
+ val app1 = respJson.getOrElse("completedapps", Array())
+ val completedApps = app1.asInstanceOf[List[Map[String, Any]]]
+
+ for (app <- completedApps) {
+ val name: String = app.getOrElse("name", "").asInstanceOf[String]
+ val id: String = app.getOrElse("id", "").asInstanceOf[String]
+ val state: String = app.getOrElse("state", "").asInstanceOf[String]
+ val startTime: Double = app.getOrElse("starttime", "0").asInstanceOf[Double]
+ if (name.contains(JOB_STEP_PREFIX)) {
+ cachedKylinJobMap(id) = (name, state, startTime.toLong)
+ }
+ }
+
+ // Clean too old jobs
+ if (cachedKylinJobMap.size > cacheMaxSize) {
+ for (id <- cachedKylinJobMap.keys) {
+ val app = cachedKylinJobMap.get(id)
+ if (app.isDefined && curr - app.get._3 > cacheTtl) {
+ cachedKylinJobMap.remove(id)
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/resources/response/application-detail-246.html b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/application-detail-246.html
new file mode 100644
index 0000000..9371cee
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/application-detail-246.html
@@ -0,0 +1,167 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <meta http-equiv="Content-type" content="text/html; charset=utf-8"/>
+ <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css"/>
+ <link rel="stylesheet" href="/static/vis-timeline-graph2d.min.css" type="text/css"/>
+ <link rel="stylesheet" href="/static/webui.css" type="text/css"/>
+ <link rel="stylesheet" href="/static/timeline-view.css" type="text/css"/>
+ <script src="/static/sorttable.js"></script>
+ <script src="/static/jquery-1.12.4.min.js"></script>
+ <script src="/static/vis-timeline-graph2d.min.js"></script>
+ <script src="/static/bootstrap-tooltip.js"></script>
+ <script src="/static/initialize-tooltips.js"></script>
+ <script src="/static/table.js"></script>
+ <script src="/static/additional-metrics.js"></script>
+ <script src="/static/timeline-view.js"></script>
+ <script src="/static/log-view.js"></script>
+ <script src="/static/webui.js"></script>
+ <script>setUIRoot('')</script>
+
+ <link rel="shortcut icon" href="/static/spark-logo-77x50px-hd.png"></link>
+ <title>Application: job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01</title>
+</head>
+<body>
+<div class="container-fluid">
+ <div class="row-fluid">
+ <div class="span12">
+ <h3 style="vertical-align: middle; display: inline-block;">
+ <a style="text-decoration: none" href="/">
+ <img src="/static/spark-logo-77x50px-hd.png"/>
+ <span class="version" style="margin-right: 15px;">2.4.6</span>
+ </a>
+ Application: job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01
+ </h3>
+ </div>
+ </div>
+ <div class="row-fluid">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>ID:</strong> app-20210701234053-0002</li>
+ <li><strong>Name:</strong> job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01</li>
+ <li><strong>User:</strong> root</li>
+ <li><strong>Cores:</strong>
+ Unlimited (4 granted)
+ </li>
+ <li>
+ <span data-toggle="tooltip" title="Maximum number of executors that this application will use. This limit is finite only when
+ dynamic allocation is enabled. The number of granted executors may exceed the limit
+ ephemerally when executors are being killed.
+ " data-placement="right">
+ <strong>Executor Limit: </strong>
+ Unlimited
+ (4 granted)
+ </span>
+ </li>
+ <li>
+ <strong>Executor Memory:</strong>
+ 1024.0 MB
+ </li>
+ <li><strong>Submit Date:</strong> 2021/07/01 23:40:53</li>
+ <li><strong>State:</strong> FINISHED</li>
+
+ </ul>
+ </div>
+ </div>
+ <div class="row-fluid"> <!-- Executors -->
+ <div class="span12">
+ <span class="collapse-aggregated-executors collapse-table"
+ onClick="collapseTable('collapse-aggregated-executors','aggregated-executors')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Executor Summary (4)</a>
+ </h4>
+ </span>
+ <div class="aggregated-executors collapsible-table">
+ <table class="table table-bordered table-condensed table-striped sortable">
+ <thead>
+ <th width="" class="">ExecutorID</th>
+ <th width="" class="">Worker</th>
+ <th width="" class="">Cores</th>
+ <th width="" class="">Memory</th>
+ <th width="" class="">State</th>
+ <th width="" class="">Logs</th>
+ </thead>
+ <tbody>
+
+ </tbody>
+ </table>
+ </div>
+ <span class="collapse-aggregated-removedExecutors collapse-table" onClick="collapseTable('collapse-aggregated-removedExecutors',
+ 'aggregated-removedExecutors')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Removed Executors (4)</a>
+ </h4>
+ </span>
+ <div class="aggregated-removedExecutors collapsible-table">
+ <table class="table table-bordered table-condensed table-striped sortable">
+ <thead>
+ <th width="" class="">ExecutorID</th>
+ <th width="" class="">Worker</th>
+ <th width="" class="">Cores</th>
+ <th width="" class="">Memory</th>
+ <th width="" class="">State</th>
+ <th width="" class="">Logs</th>
+ </thead>
+ <tbody>
+ <tr>
+ <td>2</td>
+ <td>
+ <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a>
+ </td>
+ <td>1</td>
+ <td>1024</td>
+ <td>KILLED</td>
+ <td>
+ <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=2&logType=stdout">stdout</a>
+ <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=2&logType=stderr">stderr</a>
+ </td>
+ </tr>
+ <tr>
+ <td>1</td>
+ <td>
+ <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a>
+ </td>
+ <td>1</td>
+ <td>1024</td>
+ <td>KILLED</td>
+ <td>
+ <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=1&logType=stdout">stdout</a>
+ <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=1&logType=stderr">stderr</a>
+ </td>
+ </tr>
+ <tr>
+ <td>3</td>
+ <td>
+ <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a>
+ </td>
+ <td>1</td>
+ <td>1024</td>
+ <td>KILLED</td>
+ <td>
+ <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=3&logType=stdout">stdout</a>
+ <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=3&logType=stderr">stderr</a>
+ </td>
+ </tr>
+ <tr>
+ <td>0</td>
+ <td>
+ <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a>
+ </td>
+ <td>1</td>
+ <td>1024</td>
+ <td>KILLED</td>
+ <td>
+ <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=0&logType=stdout">stdout</a>
+ <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=0&logType=stderr">stderr</a>
+ </td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ </div>
+ </div>
+</div>
+</body>
+</html>
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/resources/response/standalone-master-246.json b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/standalone-master-246.json
new file mode 100644
index 0000000..aa02873
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/standalone-master-246.json
@@ -0,0 +1,94 @@
+{
+ "url": "spark://cdh-master:10030",
+ "workers": [
+ {
+ "id": "worker-20210630172110-10.1.3.90-10040",
+ "host": "10.1.3.90",
+ "port": 10040,
+ "webuiaddress": "http://10.1.3.90:10041",
+ "cores": 5,
+ "coresused": 0,
+ "coresfree": 5,
+ "memory": 20480,
+ "memoryused": 0,
+ "memoryfree": 20480,
+ "state": "ALIVE",
+ "lastheartbeat": 1625193717228
+ }
+ ],
+ "aliveworkers": 1,
+ "cores": 5,
+ "coresused": 0,
+ "memory": 20480,
+ "memoryused": 0,
+ "activeapps": [],
+ "completedapps": [
+ {
+ "id": "app-20210701230444-0000",
+ "starttime": 1625151884352,
+ "name": "job_step_6eb0d430-2882-4699-9915-1154959c2cd8-01",
+ "cores": 4,
+ "user": "root",
+ "memoryperslave": 1024,
+ "submitdate": "Thu Jul 01 23:04:44 CST 2021",
+ "state": "FINISHED",
+ "duration": 26068
+ },
+ {
+ "id": "app-20210701232026-0001",
+ "starttime": 1625152826024,
+ "name": "job_step_6eb0d430-2882-4699-9915-1154959c2cd8-01",
+ "cores": 4,
+ "user": "root",
+ "memoryperslave": 1024,
+ "submitdate": "Thu Jul 01 23:20:26 CST 2021",
+ "state": "FINISHED",
+ "duration": 29656
+ },
+ {
+ "id": "app-20210701234053-0002",
+ "starttime": 1625154053758,
+ "name": "job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01",
+ "cores": 4,
+ "user": "root",
+ "memoryperslave": 1024,
+ "submitdate": "Thu Jul 01 23:40:53 CST 2021",
+ "state": "FINISHED",
+ "duration": 27223
+ }
+ ],
+ "activedrivers": [],
+ "completeddrivers": [
+ {
+ "id": "driver-20210701230402-0000",
+ "starttime": "1625151842605",
+ "state": "FINISHED",
+ "cores": 1,
+ "memory": 1024,
+ "submitdate": "Thu Jul 01 23:04:02 CST 2021",
+ "worker": "worker-20210630172110-10.1.3.90-10040",
+ "mainclass": "org.apache.kylin.engine.spark.application.SparkEntry"
+ },
+ {
+ "id": "driver-20210701232014-0001",
+ "starttime": "1625152814554",
+ "state": "FINISHED",
+ "cores": 1,
+ "memory": 1024,
+ "submitdate": "Thu Jul 01 23:20:14 CST 2021",
+ "worker": "worker-20210630172110-10.1.3.90-10040",
+ "mainclass": "org.apache.kylin.engine.spark.application.SparkEntry"
+ },
+ {
+ "id": "driver-20210701233954-0002",
+ "starttime": "1625153994011",
+ "state": "FINISHED",
+ "cores": 1,
+ "memory": 1024,
+ "submitdate": "Thu Jul 01 23:39:54 CST 2021",
+ "worker": "worker-20210630172110-10.1.3.90-10040",
+ "mainclass": "org.apache.kylin.engine.spark.application.SparkEntry"
+ }
+ ],
+ "status": "ALIVE"
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/spark/deploy/StandaloneAppClientTest.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/spark/deploy/StandaloneAppClientTest.scala
new file mode 100644
index 0000000..a1fabfe
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/spark/deploy/StandaloneAppClientTest.scala
@@ -0,0 +1,34 @@
+package org.apache.spark.deploy
+
+import org.apache.commons.io.IOUtils
+import org.apache.spark.sql.common.{LocalMetadata, SparderBaseFunSuite}
+
+import java.io.{InputStream, StringWriter}
+
+class StandaloneAppClientTest extends SparderBaseFunSuite with LocalMetadata {
+
+ test("Test find state function from HTML and JSON response.") {
+ val htmlFile = getClass.getClassLoader.getResourceAsStream("response/application-detail-246.html")
+ val jsonFile = getClass.getClassLoader.getResourceAsStream("response/standalone-master-246.json")
+
+ def readF(f: InputStream): String = {
+ val writer = new StringWriter()
+ IOUtils.copy(f, writer, "UTF-8")
+ f.close()
+ writer.toString
+ }
+
+ val htmlStr = readF(htmlFile)
+ val jsonStr = readF(jsonFile)
+
+
+ StandaloneAppClient.parseApplicationState(jsonStr)
+ val res = StandaloneAppClient.getRunningJobs
+ val state = StandaloneAppClient.getAppState("6eb0d430-2882-4699-9915-1154959c2cd8")
+ assert("FINISHED".equals(state))
+
+ assert(res.size == 3)
+ assert(res.contains("app-20210701232026-0001"))
+ assert("FINISHED".equals(res.getOrElse("app-20210701232026-0001", ("", "", 1L))._2))
+ }
+}