You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/07/12 05:47:58 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-5023 Support cluster deployMode for Spark App which master is standalone

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new b2016f9  KYLIN-5023 Support cluster deployMode for Spark App which master is standalone
b2016f9 is described below

commit b2016f915db454e64b7018047b3df5c707d7946c
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Sat Jul 3 15:58:23 2021 +0800

    KYLIN-5023 Support cluster deployMode for Spark App which master is standalone
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   3 +
 .../kylin/job/execution/AbstractExecutable.java    |   6 +-
 .../apache/spark/sql/common/LocalMetadata.scala    |   3 -
 .../kylin/engine/spark/job/NSparkExecutable.java   |  16 +-
 .../kylin/engine/spark/utils/MetaDumpUtil.java     |   2 +-
 .../kylin/engine/spark/utils/RestService.java      |  93 ++++++++++++
 .../spark/deploy/SparkApplicationClient.scala      |  56 +++++++
 .../apache/spark/deploy/StandaloneAppClient.scala  | 116 ++++++++++++++
 .../resources/response/application-detail-246.html | 167 +++++++++++++++++++++
 .../resources/response/standalone-master-246.json  |  94 ++++++++++++
 .../spark/deploy/StandaloneAppClientTest.scala     |  34 +++++
 11 files changed, 582 insertions(+), 8 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0d2d11e..bccda65 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1541,6 +1541,9 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
 
+    public String getSparkStandaloneMasterWebUI() {
+        return getOptional("kylin.engine.spark.standalone.master.httpUrl", "");
+    }
 
     public String getKylinJobJarPath() {
         final String jobJar = getOptional(KYLIN_ENGINE_MR_JOB_JAR);
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 4c17d67..1f14508 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -699,9 +699,9 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         return 1024;
     }
 
-    //Default driver memory base is 1024M
-    //Adujst driver memory by cuboid number with stratogy[2 -> 20 -> 100]
-    //If cuboid number is 10, then driver.memory=2*1024=2048M
+    // Default driver memory base is 1024M
+    // Adjust driver memory by cuboid number with stratogy[2 -> 20 -> 100]
+    // If cuboid number is 10, then driver.memory=2*1024=2048M
     public static Integer computeDriverMemory(Integer cuboidNum) {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         int[] driverMemoryStrategy = config.getSparkEngineDriverMemoryStrategy();
diff --git a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
index ab6b182..d7c7f5d 100644
--- a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
+++ b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
@@ -18,9 +18,6 @@
 
 package org.apache.spark.sql.common
 
-import java.io.{File, IOException}
-
-import org.apache.commons.io.FileUtils
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.{LocalFileMetadataTestCase, TempMetadataBuilder}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index efbe417..6859e65 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -63,6 +63,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.spark.deploy.SparkApplicationClient;
 import org.apache.spark.utils.SparkVersionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,6 +84,7 @@ public class NSparkExecutable extends AbstractExecutable {
     private static final String APP_JAR_NAME = "__app__.jar";
 
     private volatile boolean isYarnCluster = false;
+    private volatile boolean isStandaloneCluster = false;
 
     protected void setSparkSubmitClassName(String className) {
         this.setParam(MetadataConstants.P_CLASS_NAME, className);
@@ -165,6 +167,9 @@ public class NSparkExecutable extends AbstractExecutable {
         }
     }
 
+    /**
+     * Dump metadata from Kylin Metadata and persist to HDFS(getDistMetaUrl) for Spark Application
+     */
     void attachMetadataAndKylinProps(KylinConfig config) throws IOException {
         // The way of Updating metadata is CopyOnWrite. So it is safe to use Reference in the value.
         Set<String> dumpList = getMetadataDumpList(config);
@@ -272,6 +277,9 @@ public class NSparkExecutable extends AbstractExecutable {
 
             CliCommandExecutor exec = new CliCommandExecutor();
             exec.execute(cmd, patternedLogger, jobId);
+            if (isStandaloneCluster) {
+                SparkApplicationClient.awaitAndCheckAppState(SparkApplicationClient.STANDALONE_CLUSTER(), jobId);
+            }
             updateMetaAfterOperation(config);
             //Add metrics information to execute result for JobMetricsFacade
             getManager().addJobInfo(getId(), getJobMetricsInfo(config));
@@ -297,6 +305,12 @@ public class NSparkExecutable extends AbstractExecutable {
                 && "cluster".equals(sparkConfigOverride.get(DEPLOY_MODE)) && !(this instanceof NSparkLocalStep)) {
             this.isYarnCluster = true;
         }
+
+        if (sparkConfigOverride.get(SPARK_MASTER).toLowerCase(Locale.ROOT).startsWith("spark")
+                && "cluster".equals(sparkConfigOverride.get(DEPLOY_MODE)) && !(this instanceof NSparkLocalStep)) {
+            this.isStandaloneCluster = true;
+        }
+
         if (!sparkConfigOverride.containsKey("spark.driver.memory")) {
             sparkConfigOverride.put("spark.driver.memory", computeStepDriverMemory() + "m");
         }
@@ -434,7 +448,7 @@ public class NSparkExecutable extends AbstractExecutable {
     private ExecuteResult runLocalMode(String appArgs, KylinConfig config) {
         try {
             Class<? extends Object> appClz = ClassUtil.forName(getSparkSubmitClassName(), Object.class);
-            appClz.getMethod("main", String[].class).invoke(null, (Object) new String[] { appArgs });
+            appClz.getMethod("main", String[].class).invoke(null, (Object) new String[]{appArgs});
             updateMetaAfterOperation(config);
 
             //Add metrics information to execute result for JobMetricsFacade
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
index c2ef2c9..9121bdb 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
@@ -80,7 +80,7 @@ public class MetaDumpUtil {
             }
 
             KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
-            //upload metadata
+            // upload metadata
             new ResourceTool().copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
         }
     }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/RestService.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/RestService.java
new file mode 100644
index 0000000..71f76cc
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/RestService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.utils;
+
+import java.io.IOException;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RestService {
+    private static final Logger logger = LoggerFactory.getLogger(RestService.class);
+
+    private int connectionTimeout;
+    private int readTimeout;
+
+    public RestService(int connectionTimeout, int readTimeout) {
+        this.connectionTimeout = connectionTimeout;
+        this.readTimeout = readTimeout;
+
+    }
+
+    public String getRequest(String url) throws IOException {
+        return getRequest(url, connectionTimeout, readTimeout);
+    }
+
+
+    public String getRequest(String url, int connTimeout, int readTimeout) throws IOException {
+        HttpGet request = new HttpGet(url);
+        return execRequest(request, connTimeout, readTimeout);
+    }
+
+
+    private HttpClient getHttpClient(int connectionTimeout, int readTimeout) {
+        final HttpParams httpParams = new BasicHttpParams();
+        HttpConnectionParams.setSoTimeout(httpParams, readTimeout);
+        HttpConnectionParams.setConnectionTimeout(httpParams, connectionTimeout);
+
+        return new DefaultHttpClient(httpParams);
+    }
+
+    public String execRequest(HttpRequestBase request, int connectionTimeout, int readTimeout) throws IOException {
+        HttpClient httpClient = getHttpClient(connectionTimeout, readTimeout);
+        try {
+            HttpResponse response = httpClient.execute(request);
+            String msg = EntityUtils.toString(response.getEntity());
+            int code = response.getStatusLine().getStatusCode();
+            if (logger.isTraceEnabled()) {
+                String displayMessage;
+                if (msg.length() > 500) {
+                    displayMessage = msg.substring(0, 500);
+                } else {
+                    displayMessage = msg;
+                }
+                logger.trace("Send request: {}. And receive response[{}] which length is {}, and content is {}.", code,
+                        request.getRequestLine(), msg.length(), displayMessage);
+            }
+            if (code != 200)
+                throw new IOException("Invalid http response " + code + " when send request: "
+                        + request.getURI().toString() + "\n" + msg);
+            return msg;
+        } catch (IOException e) {
+            logger.error("error when send http request:" + request.getURI().toString(), e);
+            throw e;
+        } finally {
+            request.releaseConnection();
+        }
+    }
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
new file mode 100644
index 0000000..5b2caf4
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy
+
+import org.apache.spark.internal.Logging
+
+/**
+ * A client which ask application's state from Yarn's resource manager of Standalone master.
+ */
+object SparkApplicationClient extends Logging {
+
+  // Copied from org.apache.spark.deploy.master.ApplicationState
+  val finalStates = Set("FINISHED", "FAILED", "KILLED", "UNKNOWN")
+
+  // master is spark standalone and deployMode is cluster
+  val STANDALONE_CLUSTER: String = "standalone_cluster"
+
+  /**
+   * Report the state of an application which name is stepId until it has exited,
+   * either successfully or due to some failure, then return app state.
+   *
+   * Current only for spark job which its master is spark standalone and deployMode is cluster,
+   * because standalone client lack property "spark.standalone.submit.waitAppCompletion".
+   */
+  def awaitAndCheckAppState(sparkMaster: String, stepId: String): String = {
+    logInfo(s"AwaitAndCheckAppState $stepId{} ...")
+    sparkMaster match {
+      case STANDALONE_CLUSTER =>
+        var appState = StandaloneAppClient.getAppState(stepId)
+        while (true) {
+          logInfo(s"$stepId state is $appState .")
+          if (!finalStates.contains(appState)) {
+            Thread.sleep(10000)
+          }
+          appState = StandaloneAppClient.getAppState(stepId)
+        }
+        appState
+      case m => throw new UnsupportedOperationException("waitAndCheckAppState " + m)
+    }
+  }
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/StandaloneAppClient.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/StandaloneAppClient.scala
new file mode 100644
index 0000000..15f456a
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/StandaloneAppClient.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.engine.spark.utils.RestService
+import org.apache.spark.internal.Logging
+
+import java.io.IOException
+import scala.collection.mutable
+import scala.util.parsing.json.JSON._
+
+
+object StandaloneAppClient extends Logging {
+
+  private val JOB_STEP_PREFIX = "job_step_"
+
+  // appId -> (appName, state, starttime)
+  private val cachedKylinJobMap: mutable.Map[String, (String, String, Long)] = new mutable.LinkedHashMap[String, (String, String, Long)]()
+
+  private var jobInfoUpdateTime = System.currentTimeMillis()
+  private val cacheTtl = 3600 * 1000 * 24 * 5
+  private val cacheMaxSize = 30000
+
+  // private val masterUrlHtml: String = KylinConfig.getInstanceFromEnv.getSparkStandaloneMasterWebUI + "/app/?appId="
+  private val masterUrlJson: String = KylinConfig.getInstanceFromEnv.getSparkStandaloneMasterWebUI + "/json"
+
+  private val restService: RestService = new RestService(10000, 10000)
+
+
+  /**
+   * @see org.apache.spark.deploy.master.ApplicationInfo
+   * @return Kylin's Build Job 's ApplicationInfo, update every 5 minutes
+   */
+  def getRunningJobs: mutable.Map[String, (String, String, Long)] = cachedKylinJobMap.synchronized {
+    val currMills = System.currentTimeMillis
+    if (cachedKylinJobMap.isEmpty || currMills - jobInfoUpdateTime >= 10000) {
+      logDebug("Updating app status ...")
+      try {
+        val realResp = restService.getRequest(masterUrlJson)
+        parseApplicationState(realResp)
+      } catch {
+        case ioe: IOException => logError("Can not connect to standalone master service.", ioe)
+        case e: Exception => logError("Error .", e)
+      }
+      jobInfoUpdateTime = currMills
+    }
+    cachedKylinJobMap
+  }
+
+  def getAppState(stepId: String): String = {
+    getRunningJobs
+
+    val doNothing: PartialFunction[(String, String, Long), (String, String, Long)] = {
+      case x => x
+    }
+    val res: Iterable[(String, String, Long)] = cachedKylinJobMap.values.filter(app => app._1.contains(stepId)).collect(doNothing)
+
+    res.size match {
+      case 0 => "SUBMITTED"
+      case 1 => res.head._2
+      case _ =>
+        // find the recent submitted application
+        res.maxBy(x => x._3)._2
+    }
+  }
+
+
+  def parseApplicationState(responseStr: String): Unit = {
+    val curr = System.currentTimeMillis()
+
+    var respJson = Map.empty[String, Any]
+    val tree = parseFull(responseStr)
+    respJson = tree match {
+      case Some(map: Map[String, Any]) => map
+    }
+    val app1 = respJson.getOrElse("completedapps", Array())
+    val completedApps = app1.asInstanceOf[List[Map[String, Any]]]
+
+    for (app <- completedApps) {
+      val name: String = app.getOrElse("name", "").asInstanceOf[String]
+      val id: String = app.getOrElse("id", "").asInstanceOf[String]
+      val state: String = app.getOrElse("state", "").asInstanceOf[String]
+      val startTime: Double = app.getOrElse("starttime", "0").asInstanceOf[Double]
+      if (name.contains(JOB_STEP_PREFIX)) {
+        cachedKylinJobMap(id) = (name, state, startTime.toLong)
+      }
+    }
+
+    // Clean too old jobs
+    if (cachedKylinJobMap.size > cacheMaxSize) {
+      for (id <- cachedKylinJobMap.keys) {
+        val app = cachedKylinJobMap.get(id)
+        if (app.isDefined && curr - app.get._3 > cacheTtl) {
+          cachedKylinJobMap.remove(id)
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/resources/response/application-detail-246.html b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/application-detail-246.html
new file mode 100644
index 0000000..9371cee
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/application-detail-246.html
@@ -0,0 +1,167 @@
+<!DOCTYPE html>
+<html>
+<head>
+    <meta http-equiv="Content-type" content="text/html; charset=utf-8"/>
+    <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css"/>
+    <link rel="stylesheet" href="/static/vis-timeline-graph2d.min.css" type="text/css"/>
+    <link rel="stylesheet" href="/static/webui.css" type="text/css"/>
+    <link rel="stylesheet" href="/static/timeline-view.css" type="text/css"/>
+    <script src="/static/sorttable.js"></script>
+    <script src="/static/jquery-1.12.4.min.js"></script>
+    <script src="/static/vis-timeline-graph2d.min.js"></script>
+    <script src="/static/bootstrap-tooltip.js"></script>
+    <script src="/static/initialize-tooltips.js"></script>
+    <script src="/static/table.js"></script>
+    <script src="/static/additional-metrics.js"></script>
+    <script src="/static/timeline-view.js"></script>
+    <script src="/static/log-view.js"></script>
+    <script src="/static/webui.js"></script>
+    <script>setUIRoot('')</script>
+
+    <link rel="shortcut icon" href="/static/spark-logo-77x50px-hd.png"></link>
+    <title>Application: job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01</title>
+</head>
+<body>
+<div class="container-fluid">
+    <div class="row-fluid">
+        <div class="span12">
+            <h3 style="vertical-align: middle; display: inline-block;">
+                <a style="text-decoration: none" href="/">
+                    <img src="/static/spark-logo-77x50px-hd.png"/>
+                    <span class="version" style="margin-right: 15px;">2.4.6</span>
+                </a>
+                Application: job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01
+            </h3>
+        </div>
+    </div>
+    <div class="row-fluid">
+        <div class="span12">
+            <ul class="unstyled">
+                <li><strong>ID:</strong> app-20210701234053-0002</li>
+                <li><strong>Name:</strong> job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01</li>
+                <li><strong>User:</strong> root</li>
+                <li><strong>Cores:</strong>
+                    Unlimited (4 granted)
+                </li>
+                <li>
+              <span data-toggle="tooltip" title="Maximum number of executors that this application will use. This limit is finite only when
+       dynamic allocation is enabled. The number of granted executors may exceed the limit
+       ephemerally when executors are being killed.
+    " data-placement="right">
+                <strong>Executor Limit: </strong>
+                Unlimited
+                (4 granted)
+              </span>
+                </li>
+                <li>
+                    <strong>Executor Memory:</strong>
+                    1024.0 MB
+                </li>
+                <li><strong>Submit Date:</strong> 2021/07/01 23:40:53</li>
+                <li><strong>State:</strong> FINISHED</li>
+
+            </ul>
+        </div>
+    </div>
+    <div class="row-fluid"> <!-- Executors -->
+        <div class="span12">
+          <span class="collapse-aggregated-executors collapse-table"
+                onClick="collapseTable('collapse-aggregated-executors','aggregated-executors')">
+            <h4>
+              <span class="collapse-table-arrow arrow-open"></span>
+              <a>Executor Summary (4)</a>
+            </h4>
+          </span>
+            <div class="aggregated-executors collapsible-table">
+                <table class="table table-bordered table-condensed table-striped sortable">
+                    <thead>
+                    <th width="" class="">ExecutorID</th>
+                    <th width="" class="">Worker</th>
+                    <th width="" class="">Cores</th>
+                    <th width="" class="">Memory</th>
+                    <th width="" class="">State</th>
+                    <th width="" class="">Logs</th>
+                    </thead>
+                    <tbody>
+
+                    </tbody>
+                </table>
+            </div>
+            <span class="collapse-aggregated-removedExecutors collapse-table" onClick="collapseTable('collapse-aggregated-removedExecutors',
+                  'aggregated-removedExecutors')">
+                <h4>
+                  <span class="collapse-table-arrow arrow-open"></span>
+                  <a>Removed Executors (4)</a>
+                </h4>
+              </span>
+            <div class="aggregated-removedExecutors collapsible-table">
+                <table class="table table-bordered table-condensed table-striped sortable">
+                    <thead>
+                    <th width="" class="">ExecutorID</th>
+                    <th width="" class="">Worker</th>
+                    <th width="" class="">Cores</th>
+                    <th width="" class="">Memory</th>
+                    <th width="" class="">State</th>
+                    <th width="" class="">Logs</th>
+                    </thead>
+                    <tbody>
+                    <tr>
+                        <td>2</td>
+                        <td>
+                            <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a>
+                        </td>
+                        <td>1</td>
+                        <td>1024</td>
+                        <td>KILLED</td>
+                        <td>
+                            <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&amp;executorId=2&amp;logType=stdout">stdout</a>
+                            <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&amp;executorId=2&amp;logType=stderr">stderr</a>
+                        </td>
+                    </tr>
+                    <tr>
+                        <td>1</td>
+                        <td>
+                            <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a>
+                        </td>
+                        <td>1</td>
+                        <td>1024</td>
+                        <td>KILLED</td>
+                        <td>
+                            <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&amp;executorId=1&amp;logType=stdout">stdout</a>
+                            <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&amp;executorId=1&amp;logType=stderr">stderr</a>
+                        </td>
+                    </tr>
+                    <tr>
+                        <td>3</td>
+                        <td>
+                            <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a>
+                        </td>
+                        <td>1</td>
+                        <td>1024</td>
+                        <td>KILLED</td>
+                        <td>
+                            <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&amp;executorId=3&amp;logType=stdout">stdout</a>
+                            <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&amp;executorId=3&amp;logType=stderr">stderr</a>
+                        </td>
+                    </tr>
+                    <tr>
+                        <td>0</td>
+                        <td>
+                            <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a>
+                        </td>
+                        <td>1</td>
+                        <td>1024</td>
+                        <td>KILLED</td>
+                        <td>
+                            <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&amp;executorId=0&amp;logType=stdout">stdout</a>
+                            <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&amp;executorId=0&amp;logType=stderr">stderr</a>
+                        </td>
+                    </tr>
+                    </tbody>
+                </table>
+            </div>
+        </div>
+    </div>
+</div>
+</body>
+</html>
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/resources/response/standalone-master-246.json b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/standalone-master-246.json
new file mode 100644
index 0000000..aa02873
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/standalone-master-246.json
@@ -0,0 +1,94 @@
+{
+  "url": "spark://cdh-master:10030",
+  "workers": [
+    {
+      "id": "worker-20210630172110-10.1.3.90-10040",
+      "host": "10.1.3.90",
+      "port": 10040,
+      "webuiaddress": "http://10.1.3.90:10041",
+      "cores": 5,
+      "coresused": 0,
+      "coresfree": 5,
+      "memory": 20480,
+      "memoryused": 0,
+      "memoryfree": 20480,
+      "state": "ALIVE",
+      "lastheartbeat": 1625193717228
+    }
+  ],
+  "aliveworkers": 1,
+  "cores": 5,
+  "coresused": 0,
+  "memory": 20480,
+  "memoryused": 0,
+  "activeapps": [],
+  "completedapps": [
+    {
+      "id": "app-20210701230444-0000",
+      "starttime": 1625151884352,
+      "name": "job_step_6eb0d430-2882-4699-9915-1154959c2cd8-01",
+      "cores": 4,
+      "user": "root",
+      "memoryperslave": 1024,
+      "submitdate": "Thu Jul 01 23:04:44 CST 2021",
+      "state": "FINISHED",
+      "duration": 26068
+    },
+    {
+      "id": "app-20210701232026-0001",
+      "starttime": 1625152826024,
+      "name": "job_step_6eb0d430-2882-4699-9915-1154959c2cd8-01",
+      "cores": 4,
+      "user": "root",
+      "memoryperslave": 1024,
+      "submitdate": "Thu Jul 01 23:20:26 CST 2021",
+      "state": "FINISHED",
+      "duration": 29656
+    },
+    {
+      "id": "app-20210701234053-0002",
+      "starttime": 1625154053758,
+      "name": "job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01",
+      "cores": 4,
+      "user": "root",
+      "memoryperslave": 1024,
+      "submitdate": "Thu Jul 01 23:40:53 CST 2021",
+      "state": "FINISHED",
+      "duration": 27223
+    }
+  ],
+  "activedrivers": [],
+  "completeddrivers": [
+    {
+      "id": "driver-20210701230402-0000",
+      "starttime": "1625151842605",
+      "state": "FINISHED",
+      "cores": 1,
+      "memory": 1024,
+      "submitdate": "Thu Jul 01 23:04:02 CST 2021",
+      "worker": "worker-20210630172110-10.1.3.90-10040",
+      "mainclass": "org.apache.kylin.engine.spark.application.SparkEntry"
+    },
+    {
+      "id": "driver-20210701232014-0001",
+      "starttime": "1625152814554",
+      "state": "FINISHED",
+      "cores": 1,
+      "memory": 1024,
+      "submitdate": "Thu Jul 01 23:20:14 CST 2021",
+      "worker": "worker-20210630172110-10.1.3.90-10040",
+      "mainclass": "org.apache.kylin.engine.spark.application.SparkEntry"
+    },
+    {
+      "id": "driver-20210701233954-0002",
+      "starttime": "1625153994011",
+      "state": "FINISHED",
+      "cores": 1,
+      "memory": 1024,
+      "submitdate": "Thu Jul 01 23:39:54 CST 2021",
+      "worker": "worker-20210630172110-10.1.3.90-10040",
+      "mainclass": "org.apache.kylin.engine.spark.application.SparkEntry"
+    }
+  ],
+  "status": "ALIVE"
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/spark/deploy/StandaloneAppClientTest.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/spark/deploy/StandaloneAppClientTest.scala
new file mode 100644
index 0000000..a1fabfe
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/spark/deploy/StandaloneAppClientTest.scala
@@ -0,0 +1,34 @@
+package org.apache.spark.deploy
+
+import org.apache.commons.io.IOUtils
+import org.apache.spark.sql.common.{LocalMetadata, SparderBaseFunSuite}
+
+import java.io.{InputStream, StringWriter}
+
+class StandaloneAppClientTest extends SparderBaseFunSuite with LocalMetadata {
+
+  test("Test find state function from HTML and JSON response.") {
+    val htmlFile = getClass.getClassLoader.getResourceAsStream("response/application-detail-246.html")
+    val jsonFile = getClass.getClassLoader.getResourceAsStream("response/standalone-master-246.json")
+
+    def readF(f: InputStream): String = {
+      val writer = new StringWriter()
+      IOUtils.copy(f, writer, "UTF-8")
+      f.close()
+      writer.toString
+    }
+
+    val htmlStr = readF(htmlFile)
+    val jsonStr = readF(jsonFile)
+
+
+    StandaloneAppClient.parseApplicationState(jsonStr)
+    val res = StandaloneAppClient.getRunningJobs
+    val state = StandaloneAppClient.getAppState("6eb0d430-2882-4699-9915-1154959c2cd8")
+    assert("FINISHED".equals(state))
+
+    assert(res.size == 3)
+    assert(res.contains("app-20210701232026-0001"))
+    assert("FINISHED".equals(res.getOrElse("app-20210701232026-0001", ("", "", 1L))._2))
+  }
+}