You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/07/19 09:33:38 UTC

[linkis] branch master updated: Spark supports k8s operator submit task (#4767)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new b14cbf6e7 Spark supports k8s operator submit task (#4767)
b14cbf6e7 is described below

commit b14cbf6e7517d06daa2fad5f4bc42860933ebb69
Author: ChengJie1053 <18...@163.com>
AuthorDate: Wed Jul 19 17:33:32 2023 +0800

    Spark supports k8s operator submit task (#4767)
---
 linkis-engineconn-plugins/spark/pom.xml            |  27 +++
 .../spark/client/context/SparkConfig.java          | 137 ++++++++++++
 .../deployment/ClusterDescriptorAdapter.java       |   7 +
 .../ClusterDescriptorAdapterFactory.java           |   9 +
 ...KubernetesOperatorClusterDescriptorAdapter.java | 211 ++++++++++++++++++
 .../ApplicationState.java}                         |  25 ++-
 .../ConfigMap.java}                                |  16 +-
 .../spark/client/deployment/crds/DriverInfo.java   |  58 +++++
 .../HostPath.java}                                 |  26 ++-
 .../RestartPolicy.java}                            |  20 +-
 .../SparkApplication.java}                         |  22 +-
 .../SparkApplicationList.java}                     |  13 +-
 .../deployment/crds/SparkApplicationSpec.java      | 226 +++++++++++++++++++
 .../deployment/crds/SparkApplicationStatus.java    | 105 +++++++++
 .../spark/client/deployment/crds/SparkPodSpec.java | 238 +++++++++++++++++++++
 .../Volume.java}                                   |  36 +++-
 .../VolumeMount.java}                              |  26 ++-
 .../spark/client/deployment/util/FileUtils.java    | 201 +++++++++++++++++
 .../client/deployment/util/KubernetesHelper.java   |  82 +++++++
 .../spark/src/main/resources/spark-k8s-operator.md |  98 +++++++++
 .../spark/config/SparkConfiguration.scala          |  14 +-
 .../spark/executor/SparkSubmitOnceExecutor.scala   |   4 +-
 .../spark/factory/SparkEngineConnFactory.scala     |  16 +-
 pom.xml                                            |   3 +
 tool/dependencies/known-dependencies.txt           |  27 ++-
 25 files changed, 1582 insertions(+), 65 deletions(-)

diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml
index e5f8d0b05..add427443 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -392,6 +392,33 @@
         </exclusion>
       </exclusions>
     </dependency>
+
+    <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>kubernetes-client</artifactId>
+      <version>${kubernetes-client.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.fabric8</groupId>
+          <artifactId>kubernetes-model-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.fabric8</groupId>
+          <artifactId>kubernetes-model-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>kubernetes-model-common</artifactId>
+      <version>${kubernetes-client.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>kubernetes-model-core</artifactId>
+      <version>5.4.1</version>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
index 20b0749d8..4e4654f92 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -25,6 +25,28 @@ public class SparkConfig {
   private String javaHome; // ("")
   private String sparkHome; // ("")
   private String master = "yarn"; // ("yarn")
+
+  private String k8sConfigFile;
+
+  private String k8sServiceAccount;
+
+  private String k8sMasterUrl;
+
+  private String k8sUsername;
+
+  private String k8sPassword;
+
+  private String k8sImage;
+
+  private String k8sImagePullPolicy;
+
+  private String k8sLanguageType;
+
+  private String k8sRestartPolicy;
+
+  private String k8sSparkVersion;
+
+  private String k8sNamespace;
   private String deployMode = "client"; // ("client") // todo cluster
   private String appResource; // ("")
   private String appName; // ("")
@@ -51,6 +73,94 @@ public class SparkConfig {
   private String keytab; // ("--keytab", "")
   private String queue; // ("--queue", "")
 
+  public String getK8sImagePullPolicy() {
+    return k8sImagePullPolicy;
+  }
+
+  public void setK8sImagePullPolicy(String k8sImagePullPolicy) {
+    this.k8sImagePullPolicy = k8sImagePullPolicy;
+  }
+
+  public String getK8sLanguageType() {
+    return k8sLanguageType;
+  }
+
+  public void setK8sLanguageType(String k8sLanguageType) {
+    this.k8sLanguageType = k8sLanguageType;
+  }
+
+  public String getK8sRestartPolicy() {
+    return k8sRestartPolicy;
+  }
+
+  public void setK8sRestartPolicy(String k8sRestartPolicy) {
+    this.k8sRestartPolicy = k8sRestartPolicy;
+  }
+
+  public String getK8sSparkVersion() {
+    return k8sSparkVersion;
+  }
+
+  public void setK8sSparkVersion(String k8sSparkVersion) {
+    this.k8sSparkVersion = k8sSparkVersion;
+  }
+
+  public String getK8sNamespace() {
+    return k8sNamespace;
+  }
+
+  public void setK8sNamespace(String k8sNamespace) {
+    this.k8sNamespace = k8sNamespace;
+  }
+
+  public String getK8sMasterUrl() {
+    return k8sMasterUrl;
+  }
+
+  public String getK8sConfigFile() {
+    return k8sConfigFile;
+  }
+
+  public void setK8sConfigFile(String k8sConfigFile) {
+    this.k8sConfigFile = k8sConfigFile;
+  }
+
+  public String getK8sServiceAccount() {
+    return k8sServiceAccount;
+  }
+
+  public void setK8sServiceAccount(String k8sServiceAccount) {
+    this.k8sServiceAccount = k8sServiceAccount;
+  }
+
+  public void setK8sMasterUrl(String k8sMasterUrl) {
+    this.k8sMasterUrl = k8sMasterUrl;
+  }
+
+  public String getK8sUsername() {
+    return k8sUsername;
+  }
+
+  public void setK8sUsername(String k8sUsername) {
+    this.k8sUsername = k8sUsername;
+  }
+
+  public String getK8sPassword() {
+    return k8sPassword;
+  }
+
+  public void setK8sPassword(String k8sPassword) {
+    this.k8sPassword = k8sPassword;
+  }
+
+  public String getK8sImage() {
+    return k8sImage;
+  }
+
+  public void setK8sImage(String k8sImage) {
+    this.k8sImage = k8sImage;
+  }
+
   public String getJavaHome() {
     return javaHome;
   }
@@ -287,6 +397,33 @@ public class SparkConfig {
         + ", master='"
         + master
         + '\''
+        + ", k8sConfigFile='"
+        + k8sConfigFile
+        + '\''
+        + ", k8sServiceAccount='"
+        + k8sServiceAccount
+        + '\''
+        + ", k8sMasterUrl='"
+        + k8sMasterUrl
+        + '\''
+        + ", k8sImage='"
+        + k8sImage
+        + '\''
+        + ", k8sImagePullPolicy='"
+        + k8sImagePullPolicy
+        + '\''
+        + ", k8sLanguageType='"
+        + k8sLanguageType
+        + '\''
+        + ", k8sRestartPolicy='"
+        + k8sRestartPolicy
+        + '\''
+        + ", k8sSparkVersion='"
+        + k8sSparkVersion
+        + '\''
+        + ", k8sNamespace='"
+        + k8sNamespace
+        + '\''
         + ", deployMode='"
         + deployMode
         + '\''
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
index 16b7613cf..1ee44b27c 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
@@ -23,6 +23,8 @@ import org.apache.spark.launcher.CustomSparkSubmitLauncher;
 import org.apache.spark.launcher.SparkAppHandle;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +60,11 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
     return "ClusterDescriptorAdapter{" + "applicationId=" + sparkAppHandle.getAppId() + '}';
   }
 
+  public void deployCluster(String mainClass, String args, Map<String, String> confMap)
+      throws IOException {}
+
+  public abstract boolean initJobId();
+
   @Override
   public void close() {
     if (sparkAppHandle != null) {
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
index 8e9540dcb..91d3eafb6 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
@@ -19,11 +19,20 @@ package org.apache.linkis.engineplugin.spark.client.deployment;
 
 import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
 
+import org.apache.commons.lang3.StringUtils;
+
 public class ClusterDescriptorAdapterFactory {
 
   public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
+    String master = executionContext.getSparkConfig().getMaster();
+
     ClusterDescriptorAdapter clusterDescriptorAdapter =
         new YarnApplicationClusterDescriptorAdapter(executionContext);
+
+    if (StringUtils.isNotBlank(master) && master.equalsIgnoreCase("k8s-operator")) {
+      clusterDescriptorAdapter = new KubernetesOperatorClusterDescriptorAdapter(executionContext);
+    }
+
     return clusterDescriptorAdapter;
   }
 }
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
new file mode 100644
index 000000000..c77fc655c
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment;
+
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
+import org.apache.linkis.engineplugin.spark.client.deployment.crds.*;
+import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.launcher.SparkAppHandle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition;
+import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+public class KubernetesOperatorClusterDescriptorAdapter extends ClusterDescriptorAdapter {
+
+  protected SparkConfig sparkConfig;
+  protected KubernetesClient client;
+
+  public KubernetesOperatorClusterDescriptorAdapter(ExecutionContext executionContext) {
+    super(executionContext);
+    this.sparkConfig = executionContext.getSparkConfig();
+    this.client =
+        KubernetesHelper.getKubernetesClient(
+            this.sparkConfig.getK8sConfigFile(),
+            this.sparkConfig.getK8sMasterUrl(),
+            this.sparkConfig.getK8sUsername(),
+            this.sparkConfig.getK8sPassword());
+  }
+
+  public void deployCluster(String mainClass, String args, Map<String, String> confMap) {
+
+    CustomResourceDefinitionList crds =
+        client.apiextensions().v1().customResourceDefinitions().list();
+
+    String sparkApplicationCRDName = CustomResource.getCRDName(SparkApplication.class);
+    List<CustomResourceDefinition> sparkCRDList =
+        crds.getItems().stream()
+            .filter(crd -> crd.getMetadata().getName().equals(sparkApplicationCRDName))
+            .collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(sparkCRDList)) {
+      throw new RuntimeException("The Spark operator crd does not exist");
+    }
+
+    NonNamespaceOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>>
+        sparkApplicationClient = getSparkApplicationClient(client);
+    SparkApplication sparkApplication =
+        getSparkApplication(sparkConfig.getAppName(), sparkConfig.getK8sNamespace());
+
+    SparkPodSpec driver =
+        SparkPodSpec.Builder()
+            .cores(sparkConfig.getDriverCores())
+            .memory(sparkConfig.getDriverMemory())
+            .serviceAccount(sparkConfig.getK8sServiceAccount())
+            .build();
+    SparkPodSpec executor =
+        SparkPodSpec.Builder()
+            .cores(sparkConfig.getExecutorCores())
+            .instances(sparkConfig.getNumExecutors())
+            .memory(sparkConfig.getExecutorMemory())
+            .build();
+    SparkApplicationSpec sparkApplicationSpec =
+        SparkApplicationSpec.Builder()
+            .type(sparkConfig.getK8sLanguageType())
+            // todo An error occurs when the client mode is used. The cause has not been found
+            .mode("cluster")
+            .image(sparkConfig.getK8sImage())
+            .imagePullPolicy(sparkConfig.getK8sImagePullPolicy())
+            .mainClass(mainClass)
+            .mainApplicationFile(sparkConfig.getAppResource())
+            .sparkVersion(sparkConfig.getK8sSparkVersion())
+            .restartPolicy(new RestartPolicy(sparkConfig.getK8sRestartPolicy()))
+            .driver(driver)
+            .executor(executor)
+            .build();
+
+    sparkApplication.setSpec(sparkApplicationSpec);
+    SparkApplication created = sparkApplicationClient.createOrReplace(sparkApplication);
+
+    // Wait three seconds to get the status
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {
+
+    }
+
+    SparkApplicationList list = getSparkApplicationClient(client).list();
+
+    List<SparkApplication> sparkApplicationList =
+        list.getItems().stream()
+            .filter(crd -> crd.getMetadata().getName().equals(sparkConfig.getAppName()))
+            .collect(Collectors.toList());
+
+    if (CollectionUtils.isNotEmpty(sparkApplicationList)) {
+      SparkApplicationStatus status = sparkApplicationList.get(0).getStatus();
+      if (Objects.nonNull(status)) {
+        logger.info(
+            "Spark k8s task: {},status: {}",
+            sparkConfig.getAppName(),
+            status.getApplicationState().getState());
+      }
+    }
+  }
+
+  public boolean initJobId() {
+    SparkApplicationStatus sparkApplicationStatus = getKubernetesOperatorState();
+
+    if (Objects.nonNull(sparkApplicationStatus)) {
+      this.applicationId = sparkApplicationStatus.getSparkApplicationId();
+      this.jobState =
+          kubernetesOperatorStateConvertSparkState(
+              sparkApplicationStatus.getApplicationState().getState());
+    }
+
+    // When the job is not finished, the appId is monitored; otherwise, the status is
+    // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待)
+    return null != getApplicationId() || (jobState != null && jobState.isFinal());
+  }
+
+  private SparkApplicationStatus getKubernetesOperatorState() {
+    List<SparkApplication> sparkApplicationList =
+        getSparkApplicationClient(client).list().getItems();
+    if (CollectionUtils.isNotEmpty(sparkApplicationList)) {
+      for (SparkApplication sparkApplication : sparkApplicationList) {
+        if (sparkApplication.getMetadata().getNamespace().equals(this.sparkConfig.getK8sNamespace())
+            && sparkApplication.getMetadata().getName().equals(this.sparkConfig.getAppName())) {
+          return sparkApplication.getStatus();
+        }
+      }
+    }
+    return null;
+  }
+
+  public SparkAppHandle.State kubernetesOperatorStateConvertSparkState(String kubernetesState) {
+    if (StringUtils.isBlank(kubernetesState)) {
+      return SparkAppHandle.State.UNKNOWN;
+    }
+    switch (kubernetesState) {
+      case "PENDING":
+        return SparkAppHandle.State.CONNECTED;
+      case "RUNNING":
+        return SparkAppHandle.State.RUNNING;
+      case "COMPLETED":
+        return SparkAppHandle.State.FINISHED;
+      case "FAILED":
+        return SparkAppHandle.State.FAILED;
+      default:
+        return SparkAppHandle.State.UNKNOWN;
+    }
+  }
+
+  public boolean isDisposed() {
+    return this.jobState.isFinal();
+  }
+
+  @Override
+  public String toString() {
+    return "ClusterDescriptorAdapter{" + "applicationId=" + getApplicationId() + '}';
+  }
+
+  @Override
+  public void close() {
+    logger.info("Start to close job {}.", getApplicationId());
+    SparkApplication SparkApplication =
+        getSparkApplication(this.sparkConfig.getAppName(), this.sparkConfig.getK8sNamespace());
+    getSparkApplicationClient(client).delete(SparkApplication);
+    client.close();
+  }
+
+  public static NonNamespaceOperation<
+          SparkApplication, SparkApplicationList, Resource<SparkApplication>>
+      getSparkApplicationClient(KubernetesClient client) {
+    return client.customResources(SparkApplication.class, SparkApplicationList.class);
+  }
+
+  public static SparkApplication getSparkApplication(String sparkOperatorName, String namespace) {
+    SparkApplication sparkApplication = new SparkApplication();
+    ObjectMeta metadata = new ObjectMeta();
+    metadata.setName(sparkOperatorName);
+    metadata.setNamespace(namespace);
+    sparkApplication.setMetadata(metadata);
+    return sparkApplication;
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ApplicationState.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ApplicationState.java
index 8e9540dcb..ac59f75be 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ApplicationState.java
@@ -15,15 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
 
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class ApplicationState {
 
-public class ClusterDescriptorAdapterFactory {
+  private String state;
+  private String errorMessage;
 
-  public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
-    ClusterDescriptorAdapter clusterDescriptorAdapter =
-        new YarnApplicationClusterDescriptorAdapter(executionContext);
-    return clusterDescriptorAdapter;
+  public String getState() {
+    return state;
+  }
+
+  public void setState(String state) {
+    this.state = state;
+  }
+
+  public String getErrorMessage() {
+    return errorMessage;
+  }
+
+  public void setErrorMessage(String errorMessage) {
+    this.errorMessage = errorMessage;
   }
 }
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ConfigMap.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ConfigMap.java
index 8e9540dcb..4ea04d7c5 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ConfigMap.java
@@ -15,15 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
 
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class ConfigMap {
 
-public class ClusterDescriptorAdapterFactory {
+  private String name;
 
-  public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
-    ClusterDescriptorAdapter clusterDescriptorAdapter =
-        new YarnApplicationClusterDescriptorAdapter(executionContext);
-    return clusterDescriptorAdapter;
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
   }
 }
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/DriverInfo.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/DriverInfo.java
new file mode 100644
index 000000000..0484e2390
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/DriverInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
+
+public class DriverInfo {
+
+  private String podName;
+  private String webUIAddress;
+  private String webUIPort;
+  private String webUIServiceName;
+
+  public String getWebUIServiceName() {
+    return webUIServiceName;
+  }
+
+  public void setWebUIServiceName(String webUIServiceName) {
+    this.webUIServiceName = webUIServiceName;
+  }
+
+  public String getWebUIPort() {
+    return webUIPort;
+  }
+
+  public void setWebUIPort(String webUIPort) {
+    this.webUIPort = webUIPort;
+  }
+
+  public String getWebUIAddress() {
+    return webUIAddress;
+  }
+
+  public void setWebUIAddress(String webUIAddress) {
+    this.webUIAddress = webUIAddress;
+  }
+
+  public String getPodName() {
+    return podName;
+  }
+
+  public void setPodName(String podName) {
+    this.podName = podName;
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/HostPath.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/HostPath.java
index 8e9540dcb..053150352 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/HostPath.java
@@ -15,15 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
 
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class HostPath {
 
-public class ClusterDescriptorAdapterFactory {
+  private String path;
 
-  public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
-    ClusterDescriptorAdapter clusterDescriptorAdapter =
-        new YarnApplicationClusterDescriptorAdapter(executionContext);
-    return clusterDescriptorAdapter;
+  private String type;
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
   }
 }
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/RestartPolicy.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/RestartPolicy.java
index 8e9540dcb..276505af8 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/RestartPolicy.java
@@ -15,15 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
 
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class RestartPolicy {
 
-public class ClusterDescriptorAdapterFactory {
+  private String type;
 
-  public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
-    ClusterDescriptorAdapter clusterDescriptorAdapter =
-        new YarnApplicationClusterDescriptorAdapter(executionContext);
-    return clusterDescriptorAdapter;
+  public RestartPolicy(String type) {
+    this.type = type;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
   }
 }
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplication.java
similarity index 55%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplication.java
index 8e9540dcb..6a0b143a7 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplication.java
@@ -15,15 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
 
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import io.fabric8.kubernetes.api.model.Namespaced;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.Kind;
+import io.fabric8.kubernetes.model.annotation.Version;
 
-public class ClusterDescriptorAdapterFactory {
+@Version(SparkApplication.VERSION)
+@Group(SparkApplication.GROUP)
+@Kind(SparkApplication.Kind)
+public class SparkApplication extends CustomResource<SparkApplicationSpec, SparkApplicationStatus>
+    implements Namespaced {
+  public static final String GROUP = "sparkoperator.k8s.io";
+  public static final String VERSION = "v1beta2";
 
-  public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
-    ClusterDescriptorAdapter clusterDescriptorAdapter =
-        new YarnApplicationClusterDescriptorAdapter(executionContext);
-    return clusterDescriptorAdapter;
-  }
+  public static final String Kind = "SparkApplication";
 }
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationList.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationList.java
index 8e9540dcb..4e70f6bc1 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationList.java
@@ -15,15 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
 
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import io.fabric8.kubernetes.client.CustomResourceList;
 
-public class ClusterDescriptorAdapterFactory {
-
-  public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
-    ClusterDescriptorAdapter clusterDescriptorAdapter =
-        new YarnApplicationClusterDescriptorAdapter(executionContext);
-    return clusterDescriptorAdapter;
-  }
-}
+public class SparkApplicationList extends CustomResourceList<SparkApplication> {}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
new file mode 100644
index 000000000..b170233b5
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
+
+import java.util.List;
+
+import io.fabric8.kubernetes.api.model.KubernetesResource;
+
+public class SparkApplicationSpec implements KubernetesResource {
+
+  private String type;
+
+  private String mode;
+
+  private String image;
+
+  private String imagePullPolicy;
+
+  private String mainClass;
+
+  private String mainApplicationFile;
+
+  private String sparkVersion;
+
+  private RestartPolicy restartPolicy;
+
+  private List<Volume> volumes;
+
+  private SparkPodSpec driver;
+
+  private SparkPodSpec executor;
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public String getMode() {
+    return mode;
+  }
+
+  public void setMode(String mode) {
+    this.mode = mode;
+  }
+
+  public String getImage() {
+    return image;
+  }
+
+  public void setImage(String image) {
+    this.image = image;
+  }
+
+  public String getImagePullPolicy() {
+    return imagePullPolicy;
+  }
+
+  public void setImagePullPolicy(String imagePullPolicy) {
+    this.imagePullPolicy = imagePullPolicy;
+  }
+
+  public String getMainClass() {
+    return mainClass;
+  }
+
+  public void setMainClass(String mainClass) {
+    this.mainClass = mainClass;
+  }
+
+  public String getMainApplicationFile() {
+    return mainApplicationFile;
+  }
+
+  public void setMainApplicationFile(String mainApplicationFile) {
+    this.mainApplicationFile = mainApplicationFile;
+  }
+
+  public String getSparkVersion() {
+    return sparkVersion;
+  }
+
+  public void setSparkVersion(String sparkVersion) {
+    this.sparkVersion = sparkVersion;
+  }
+
+  public RestartPolicy getRestartPolicy() {
+    return restartPolicy;
+  }
+
+  public void setRestartPolicy(RestartPolicy restartPolicy) {
+    this.restartPolicy = restartPolicy;
+  }
+
+  public List<Volume> getVolumes() {
+    return volumes;
+  }
+
+  public void setVolumes(List<Volume> volumes) {
+    this.volumes = volumes;
+  }
+
+  public SparkPodSpec getDriver() {
+    return driver;
+  }
+
+  public void setDriver(SparkPodSpec driver) {
+    this.driver = driver;
+  }
+
+  public SparkPodSpec getExecutor() {
+    return executor;
+  }
+
+  public void setExecutor(SparkPodSpec executor) {
+    this.executor = executor;
+  }
+
+  public static SparkApplicationSpecBuilder Builder() {
+    return new SparkApplicationSpecBuilder();
+  }
+
+  public static class SparkApplicationSpecBuilder {
+    private String type;
+    private String mode;
+    private String image;
+    private String imagePullPolicy;
+    private String mainClass;
+    private String mainApplicationFile;
+    private String sparkVersion;
+    private RestartPolicy restartPolicy;
+    private List<Volume> volumes;
+    private SparkPodSpec driver;
+    private SparkPodSpec executor;
+
+    private SparkApplicationSpecBuilder() {}
+
+    public SparkApplicationSpecBuilder type(String type) {
+      this.type = type;
+      return this;
+    }
+
+    public SparkApplicationSpecBuilder mode(String mode) {
+      this.mode = mode;
+      return this;
+    }
+
+    public SparkApplicationSpecBuilder image(String image) {
+      this.image = image;
+      return this;
+    }
+
+    public SparkApplicationSpecBuilder imagePullPolicy(String imagePullPolicy) {
+      this.imagePullPolicy = imagePullPolicy;
+      return this;
+    }
+
+    public SparkApplicationSpecBuilder mainClass(String mainClass) {
+      this.mainClass = mainClass;
+      return this;
+    }
+
+    public SparkApplicationSpecBuilder mainApplicationFile(String mainApplicationFile) {
+      this.mainApplicationFile = mainApplicationFile;
+      return this;
+    }
+
+    public SparkApplicationSpecBuilder sparkVersion(String sparkVersion) {
+      this.sparkVersion = sparkVersion;
+      return this;
+    }
+
+    public SparkApplicationSpecBuilder restartPolicy(RestartPolicy restartPolicy) {
+      this.restartPolicy = restartPolicy;
+      return this;
+    }
+
+    public SparkApplicationSpecBuilder volumes(List<Volume> volumes) {
+      this.volumes = volumes;
+      return this;
+    }
+
+    public SparkApplicationSpecBuilder driver(SparkPodSpec driver) {
+      this.driver = driver;
+      return this;
+    }
+
+    public SparkApplicationSpecBuilder executor(SparkPodSpec executor) {
+      this.executor = executor;
+      return this;
+    }
+
+    public SparkApplicationSpec build() {
+      SparkApplicationSpec sparkApplicationSpec = new SparkApplicationSpec();
+      sparkApplicationSpec.type = this.type;
+      sparkApplicationSpec.mainClass = this.mainClass;
+      sparkApplicationSpec.imagePullPolicy = this.imagePullPolicy;
+      sparkApplicationSpec.volumes = this.volumes;
+      sparkApplicationSpec.driver = this.driver;
+      sparkApplicationSpec.sparkVersion = this.sparkVersion;
+      sparkApplicationSpec.mode = this.mode;
+      sparkApplicationSpec.mainApplicationFile = this.mainApplicationFile;
+      sparkApplicationSpec.executor = this.executor;
+      sparkApplicationSpec.image = this.image;
+      sparkApplicationSpec.restartPolicy = this.restartPolicy;
+      return sparkApplicationSpec;
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationStatus.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationStatus.java
new file mode 100644
index 000000000..c7668ea33
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationStatus.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
+
+import java.util.Map;
+
+public class SparkApplicationStatus {
+
+  private String sparkApplicationId;
+  private String terminationTime;
+  private String lastSubmissionAttemptTime;
+  private String submissionID;
+  private ApplicationState applicationState;
+  private Integer executionAttempts;
+  private Integer submissionAttempts;
+  private DriverInfo driverInfo;
+  private Map<String, String> executorState;
+
+  public String getSubmissionID() {
+    return submissionID;
+  }
+
+  public void setSubmissionID(String submissionID) {
+    this.submissionID = submissionID;
+  }
+
+  public Integer getSubmissionAttempts() {
+    return submissionAttempts;
+  }
+
+  public void setSubmissionAttempts(Integer submissionAttempts) {
+    this.submissionAttempts = submissionAttempts;
+  }
+
+  public String getLastSubmissionAttemptTime() {
+    return lastSubmissionAttemptTime;
+  }
+
+  public void setLastSubmissionAttemptTime(String lastSubmissionAttemptTime) {
+    this.lastSubmissionAttemptTime = lastSubmissionAttemptTime;
+  }
+
+  public String getSparkApplicationId() {
+    return sparkApplicationId;
+  }
+
+  public void setSparkApplicationId(String sparkApplicationId) {
+    this.sparkApplicationId = sparkApplicationId;
+  }
+
+  public String getTerminationTime() {
+    return terminationTime;
+  }
+
+  public void setTerminationTime(String terminationTime) {
+    this.terminationTime = terminationTime;
+  }
+
+  public ApplicationState getApplicationState() {
+    return applicationState;
+  }
+
+  public void setApplicationState(ApplicationState applicationState) {
+    this.applicationState = applicationState;
+  }
+
+  public Integer getExecutionAttempts() {
+    return executionAttempts;
+  }
+
+  public void setExecutionAttempts(Integer executionAttempts) {
+    this.executionAttempts = executionAttempts;
+  }
+
+  public DriverInfo getDriverInfo() {
+    return driverInfo;
+  }
+
+  public void setDriverInfo(DriverInfo driverInfo) {
+    this.driverInfo = driverInfo;
+  }
+
+  public Map<String, String> getExecutorState() {
+    return executorState;
+  }
+
+  public void setExecutorState(Map<String, String> executorState) {
+    this.executorState = executorState;
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkPodSpec.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkPodSpec.java
new file mode 100644
index 000000000..6c7b281a9
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkPodSpec.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
+
+import java.util.List;
+import java.util.Map;
+
+public class SparkPodSpec {
+
+  private Integer cores;
+
+  private String coreLimit;
+
+  private String memory;
+
+  private Map<String, String> labels;
+
+  private Map<String, String> annotations;
+
+  private String serviceAccount;
+
+  private List<VolumeMount> volumeMounts;
+
+  private Integer instances;
+
+  public Integer getCores() {
+    return cores;
+  }
+
+  public void setCores(Integer cores) {
+    this.cores = cores;
+  }
+
+  public String getCoreLimit() {
+    return coreLimit;
+  }
+
+  public void setCoreLimit(String coreLimit) {
+    this.coreLimit = coreLimit;
+  }
+
+  public String getMemory() {
+    return memory;
+  }
+
+  public void setMemory(String memory) {
+    this.memory = memory;
+  }
+
+  public Map<String, String> getLabels() {
+    return labels;
+  }
+
+  public void setLabels(Map<String, String> labels) {
+    this.labels = labels;
+  }
+
+  public Map<String, String> getAnnotations() {
+    return annotations;
+  }
+
+  public void setAnnotations(Map<String, String> annotations) {
+    this.annotations = annotations;
+  }
+
+  public String getServiceAccount() {
+    return serviceAccount;
+  }
+
+  public void setServiceAccount(String serviceAccount) {
+    this.serviceAccount = serviceAccount;
+  }
+
+  public List<VolumeMount> getVolumeMounts() {
+    return volumeMounts;
+  }
+
+  public void setVolumeMounts(List<VolumeMount> volumeMounts) {
+    this.volumeMounts = volumeMounts;
+  }
+
+  public Integer getInstances() {
+    return instances;
+  }
+
+  public void setInstances(Integer instances) {
+    this.instances = instances;
+  }
+
+  public static SparkPodSpecBuilder Builder() {
+    return new SparkPodSpecBuilder();
+  }
+
+  public static class SparkPodSpecBuilder {
+    private Integer cores;
+    private String coreLimit;
+    private String memory;
+    private Map<String, String> labels;
+    private Map<String, String> annotations;
+    private String serviceAccount;
+    private List<VolumeMount> volumeMounts;
+    private Integer instances;
+
+    public Integer getCores() {
+      return cores;
+    }
+
+    public void setCores(Integer cores) {
+      this.cores = cores;
+    }
+
+    public String getCoreLimit() {
+      return coreLimit;
+    }
+
+    public void setCoreLimit(String coreLimit) {
+      this.coreLimit = coreLimit;
+    }
+
+    public String getMemory() {
+      return memory;
+    }
+
+    public void setMemory(String memory) {
+      this.memory = memory;
+    }
+
+    public Map<String, String> getLabels() {
+      return labels;
+    }
+
+    public void setLabels(Map<String, String> labels) {
+      this.labels = labels;
+    }
+
+    public Map<String, String> getAnnotations() {
+      return annotations;
+    }
+
+    public void setAnnotations(Map<String, String> annotations) {
+      this.annotations = annotations;
+    }
+
+    public String getServiceAccount() {
+      return serviceAccount;
+    }
+
+    public void setServiceAccount(String serviceAccount) {
+      this.serviceAccount = serviceAccount;
+    }
+
+    public List<VolumeMount> getVolumeMounts() {
+      return volumeMounts;
+    }
+
+    public void setVolumeMounts(List<VolumeMount> volumeMounts) {
+      this.volumeMounts = volumeMounts;
+    }
+
+    public Integer getInstances() {
+      return instances;
+    }
+
+    public void setInstances(Integer instances) {
+      this.instances = instances;
+    }
+
+    private SparkPodSpecBuilder() {}
+
+    public SparkPodSpecBuilder cores(Integer cores) {
+      this.cores = cores;
+      return this;
+    }
+
+    public SparkPodSpecBuilder coreLimit(String coreLimit) {
+      this.coreLimit = coreLimit;
+      return this;
+    }
+
+    public SparkPodSpecBuilder memory(String memory) {
+      this.memory = memory;
+      return this;
+    }
+
+    public SparkPodSpecBuilder labels(Map<String, String> labels) {
+      this.labels = labels;
+      return this;
+    }
+
+    public SparkPodSpecBuilder annotations(Map<String, String> annotations) {
+      this.annotations = annotations;
+      return this;
+    }
+
+    public SparkPodSpecBuilder serviceAccount(String serviceAccount) {
+      this.serviceAccount = serviceAccount;
+      return this;
+    }
+
+    public SparkPodSpecBuilder volumeMounts(List<VolumeMount> volumeMounts) {
+      this.volumeMounts = volumeMounts;
+      return this;
+    }
+
+    public SparkPodSpecBuilder instances(Integer instances) {
+      this.instances = instances;
+      return this;
+    }
+
+    public SparkPodSpec build() {
+      SparkPodSpec sparkPodSpec = new SparkPodSpec();
+      sparkPodSpec.annotations = this.annotations;
+      sparkPodSpec.coreLimit = this.coreLimit;
+      sparkPodSpec.instances = this.instances;
+      sparkPodSpec.labels = this.labels;
+      sparkPodSpec.serviceAccount = this.serviceAccount;
+      sparkPodSpec.cores = this.cores;
+      sparkPodSpec.memory = this.memory;
+      sparkPodSpec.volumeMounts = this.volumeMounts;
+      return sparkPodSpec;
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/Volume.java
similarity index 58%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/Volume.java
index 8e9540dcb..4410fe80f 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/Volume.java
@@ -15,15 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
 
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class Volume {
 
-public class ClusterDescriptorAdapterFactory {
+  private String name;
 
-  public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
-    ClusterDescriptorAdapter clusterDescriptorAdapter =
-        new YarnApplicationClusterDescriptorAdapter(executionContext);
-    return clusterDescriptorAdapter;
+  private HostPath hostPath;
+
+  private ConfigMap configMap;
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public HostPath getHostPath() {
+    return hostPath;
+  }
+
+  public void setHostPath(HostPath hostPath) {
+    this.hostPath = hostPath;
+  }
+
+  public ConfigMap getConfigMap() {
+    return configMap;
+  }
+
+  public void setConfigMap(ConfigMap configMap) {
+    this.configMap = configMap;
   }
 }
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/VolumeMount.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/VolumeMount.java
index 8e9540dcb..a4b2fb70c 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/VolumeMount.java
@@ -15,15 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
 
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class VolumeMount {
 
-public class ClusterDescriptorAdapterFactory {
+  private String name;
 
-  public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
-    ClusterDescriptorAdapter clusterDescriptorAdapter =
-        new YarnApplicationClusterDescriptorAdapter(executionContext);
-    return clusterDescriptorAdapter;
+  private String mountPath;
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getMountPath() {
+    return mountPath;
+  }
+
+  public void setMountPath(String mountPath) {
+    this.mountPath = mountPath;
   }
 }
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/FileUtils.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/FileUtils.java
new file mode 100644
index 000000000..2b311b91b
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/FileUtils.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.stream.Stream;
+
+/**
+ * This is a utility class to deal files and directories. Contains utilities for recursive deletion
+ * and creation of temporary files.
+ */
+public final class FileUtils {
+
+  /** Global lock to prevent concurrent directory deletes under Windows and MacOS. */
+  private static final Object DELETE_LOCK = new Object();
+
+  /** The alphabet to construct the random part of the filename from. */
+  private static final char[] ALPHABET = {
+    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f'
+  };
+
+  /** The length of the random part of the filename. */
+  private static final int RANDOM_FILE_NAME_LENGTH = 12;
+
+  /**
+   * The maximum size of array to allocate for reading. See {@code MAX_BUFFER_SIZE} in {@link Files}
+   * for more.
+   */
+  private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
+
+  /** The size of the buffer used for reading. */
+  private static final int BUFFER_SIZE = 4096;
+
+  private static final String JAR_FILE_EXTENSION = "jar";
+
+  public static final String CLASS_FILE_EXTENSION = "class";
+
+  public static final String PACKAGE_SEPARATOR = ".";
+
+  // ------------------------------------------------------------------------
+
+  public static void writeCompletely(WritableByteChannel channel, ByteBuffer src)
+      throws IOException {
+    while (src.hasRemaining()) {
+      channel.write(src);
+    }
+  }
+
+  // ------------------------------------------------------------------------
+
+  /** Lists the given directory in a resource-leak-safe way. */
+  public static Path[] listDirectory(Path directory) throws IOException {
+    try (Stream<Path> stream = Files.list(directory)) {
+      return stream.toArray(Path[]::new);
+    }
+  }
+
+  // ------------------------------------------------------------------------
+
+  /**
+   * Constructs a random filename with the given prefix and a random part generated from hex
+   * characters.
+   *
+   * @param prefix the prefix to the filename to be constructed
+   * @return the generated random filename with the given prefix
+   */
+  public static String getRandomFilename(final String prefix) {
+    final Random rnd = new Random();
+    final StringBuilder stringBuilder = new StringBuilder(prefix);
+
+    for (int i = 0; i < RANDOM_FILE_NAME_LENGTH; i++) {
+      stringBuilder.append(ALPHABET[rnd.nextInt(ALPHABET.length)]);
+    }
+
+    return stringBuilder.toString();
+  }
+
+  // ------------------------------------------------------------------------
+  //  Simple reading and writing of files
+  // ------------------------------------------------------------------------
+
+  public static String readFile(File file, String charsetName) throws IOException {
+    byte[] bytes = readAllBytes(file.toPath());
+    return new String(bytes, charsetName);
+  }
+
+  public static String readFileUtf8(File file) throws IOException {
+    return readFile(file, "UTF-8");
+  }
+
+  public static void writeFile(File file, String contents, String encoding) throws IOException {
+    byte[] bytes = contents.getBytes(encoding);
+    Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
+  }
+
+  public static void writeFileUtf8(File file, String contents) throws IOException {
+    writeFile(file, contents, "UTF-8");
+  }
+
+  /**
+   * Reads all the bytes from a file. The method ensures that the file is closed when all bytes have
+   * been read or an I/O error, or other runtime exception, is thrown.
+   *
+   * <p>This is an implementation that follow {@link Files#readAllBytes(Path)}, and the difference
+   * is that it limits the size of the direct buffer to avoid direct-buffer OutOfMemoryError. When
+   * {@link Files#readAllBytes(Path)} or other interfaces in java API can do this in the future, we
+   * should remove it.
+   *
+   * @param path the path to the file
+   * @return a byte array containing the bytes read from the file
+   * @throws IOException if an I/O error occurs reading from the stream
+   * @throws OutOfMemoryError if an array of the required size cannot be allocated, for example the
+   *     file is larger that {@code 2GB}
+   */
+  public static byte[] readAllBytes(Path path) throws IOException {
+    try (SeekableByteChannel channel = Files.newByteChannel(path);
+        InputStream in = Channels.newInputStream(channel)) {
+
+      long size = channel.size();
+      if (size > (long) MAX_BUFFER_SIZE) {
+        throw new OutOfMemoryError("Required array size too large");
+      }
+
+      return read(in, (int) size);
+    }
+  }
+
+  /**
+   * Reads all the bytes from an input stream. Uses {@code initialSize} as a hint about how many
+   * bytes the stream will have and uses {@code directBufferSize} to limit the size of the direct
+   * buffer used to read.
+   *
+   * @param source the input stream to read from
+   * @param initialSize the initial size of the byte array to allocate
+   * @return a byte array containing the bytes read from the file
+   * @throws IOException if an I/O error occurs reading from the stream
+   * @throws OutOfMemoryError if an array of the required size cannot be allocated
+   */
+  private static byte[] read(InputStream source, int initialSize) throws IOException {
+    int capacity = initialSize;
+    byte[] buf = new byte[capacity];
+    int nread = 0;
+    int n;
+
+    for (; ; ) {
+      // read to EOF which may read more or less than initialSize (eg: file
+      // is truncated while we are reading)
+      while ((n = source.read(buf, nread, Math.min(capacity - nread, BUFFER_SIZE))) > 0) {
+        nread += n;
+      }
+
+      // if last call to source.read() returned -1, we are done
+      // otherwise, try to read one more byte; if that failed we're done too
+      if (n < 0 || (n = source.read()) < 0) {
+        break;
+      }
+
+      // one more byte was read; need to allocate a larger buffer
+      if (capacity <= MAX_BUFFER_SIZE - capacity) {
+        capacity = Math.max(capacity << 1, BUFFER_SIZE);
+      } else {
+        if (capacity == MAX_BUFFER_SIZE) {
+          throw new OutOfMemoryError("Required array size too large");
+        }
+        capacity = MAX_BUFFER_SIZE;
+      }
+      buf = Arrays.copyOf(buf, capacity);
+      buf[nread++] = (byte) n;
+    }
+    return (capacity == nread) ? buf : Arrays.copyOf(buf, nread);
+  }
+
+  /** Private default constructor to avoid instantiation. */
+  private FileUtils() {}
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/KubernetesHelper.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/KubernetesHelper.java
new file mode 100644
index 000000000..04b82a843
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/KubernetesHelper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+
+import io.fabric8.kubernetes.client.*;
+
+public class KubernetesHelper {
+
+  public static KubernetesClient getKubernetesClientByUrl(
+      String k8sMasterUrl, String k8sUsername, String k8sPassword) {
+    Config config =
+        new ConfigBuilder()
+            .withMasterUrl(k8sMasterUrl)
+            .withUsername(k8sUsername)
+            .withPassword(k8sPassword)
+            .build();
+    return new DefaultKubernetesClient(config);
+  }
+
+  public static KubernetesClient getKubernetesClientByUrl(String k8sMasterUrl) {
+    Config config = new ConfigBuilder().withMasterUrl(k8sMasterUrl).build();
+    return new DefaultKubernetesClient(config);
+  }
+
+  public static KubernetesClient getKubernetesClient(
+      String kubeConfigFile, String k8sMasterUrl, String k8sUsername, String k8sPassword) {
+    // The ConfigFile mode is preferred
+    if (StringUtils.isNotBlank(kubeConfigFile)) {
+      return getKubernetesClientByKubeConfigFile(kubeConfigFile);
+    }
+
+    if (StringUtils.isNotBlank(k8sMasterUrl)
+        && StringUtils.isNotBlank(k8sUsername)
+        && StringUtils.isNotBlank(k8sPassword)) {
+      return getKubernetesClientByUrl(k8sMasterUrl, k8sUsername, k8sPassword);
+    }
+
+    if (StringUtils.isNotBlank(k8sMasterUrl)) {
+      return getKubernetesClientByUrl(k8sMasterUrl);
+    }
+
+    throw new KubernetesClientException(
+        "Both kubeConfigFile and k8sMasterUrl are empty. Initializing KubernetesClient failed.");
+  }
+
+  public static KubernetesClient getKubernetesClientByKubeConfigFile(String kubeConfigFile) {
+    final Config config;
+
+    if (kubeConfigFile != null) {
+      try {
+        config =
+            Config.fromKubeconfig(null, FileUtils.readFileUtf8(new File(kubeConfigFile)), null);
+      } catch (IOException e) {
+        throw new KubernetesClientException("Load kubernetes config failed.", e);
+      }
+    } else {
+      config = Config.autoConfigure(null);
+    }
+
+    return new DefaultKubernetesClient(config);
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/resources/spark-k8s-operator.md b/linkis-engineconn-plugins/spark/src/main/resources/spark-k8s-operator.md
new file mode 100644
index 000000000..542fbceb9
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/resources/spark-k8s-operator.md
@@ -0,0 +1,98 @@
+
+### 1. spark-on-k8s-operator document
+
+```text
+https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/quick-start-guide.md
+```
+
+
+### 2. spark-on-k8s-operator install
+
+```text
+helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
+
+helm install my-release spark-operator/spark-operator --namespace spark-operator --create-namespace  --set webhook.enable=true  
+```
+
+### 3. spark-on-k8s-operator test task submit
+
+```text
+kubectl apply -f examples/spark-pi.yaml
+```
+
+### 4. If an error is reported: Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. pods "spark-pi-driver" is forbidden: error looking up service account spark/spark: serviceaccount "spark" not found.
+
+```text
+kubectl create serviceaccount spark
+
+kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
+```
+
+### 5. spark-on-k8s-operator Uninstall (usually not required, uninstall after installation problems)
+
+```text
+helm uninstall my-release  --namespace spark-operator
+
+kubectl delete serviceaccounts my-release-spark-operator --namespace spark-operator
+
+kubectl delete clusterrole my-release-spark-operator --namespace spark-operator
+
+kubectl delete clusterrolebindings my-release-spark-operator --namespace spark-operator
+```
+
+### 6. Submitting tasks with Restful API
+```text
+POST /api/rest_j/v1/entrance/submit
+```
+
+```json
+{
+  "executionContent": {
+    "spark.app.main.class": "org.apache.spark.examples.SparkPi",
+    "spark.app.args": "spark.app.args",
+    "runType": "jar",
+    "code": "show databases"
+  },
+  "params": {
+    "variable": {
+    },
+    "configuration": {
+      "startup": {
+        "spark.executor.memory": "1g",
+        "spark.driver.memory": "1g",
+        "spark.executor.cores": "1",
+        "spark.app.name": "spark-submit-jar-cjtest",
+        "spark.app.resource": "local:///opt/spark/examples/jars/spark-examples_2.12-3.2.1.jar",
+        "spark.executor.instances": 1,
+        "spark.master": "k8soperator",
+        "spark.k8s.master.url": "http://ip:port",
+        "spark.k8s.username": "username",
+        "spark.k8s.password": "password"
+      }
+    }
+  },
+  "source":  {
+    "scriptPath": "file:///tmp/hadoop/test.sql"
+  },
+  "labels": {
+    "engineType": "spark-3.2.1",
+    "engineConnMode": "once",
+    "userCreator": "linkis-IDE"
+  }
+}
+```
+
+### 7. Reference document
+```text
+https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
+
+https://github.com/fabric8io/kubernetes-client/
+
+https://github.com/apple/batch-processing-gateway
+
+https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-introduction-spark-submit-kubernetes-operator
+
+https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-deep-dive-kubernetes-operator-for-spark
+```
+
+
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index dc851c6fb..eb4bbdaa1 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -50,6 +50,18 @@ object SparkConfiguration extends Logging {
   val SPARK_APP_RESOURCE = CommonVars[String]("spark.app.resource", "")
   val SPARK_APP_CONF = CommonVars[String]("spark.extconf", "")
 
+  val SPARK_K8S_CONFIG_FILE = CommonVars[String]("linkis.spark.k8s.config.file", "")
+  val SPARK_K8S_SERVICE_ACCOUNT = CommonVars[String]("linkis.spark.k8s.serviceAccount", "")
+  val SPARK_K8S_MASTER_URL = CommonVars[String]("linkis.spark.k8s.master.url", "")
+  val SPARK_K8S_USERNAME = CommonVars[String]("linkis.spark.k8s.username", "")
+  val SPARK_K8S_PASSWORD = CommonVars[String]("linkis.spark.k8s.password", "")
+  val SPARK_K8S_IMAGE = CommonVars[String]("linkis.spark.k8s.image", "apache/spark:v3.2.1")
+  val SPARK_K8S_IMAGE_PULL_POLICY = CommonVars[String]("linkis.spark.k8s.imagePullPolicy", "Always")
+  val SPARK_K8S_LANGUAGE_TYPE = CommonVars[String]("linkis.spark.k8s.languageType", "Scala")
+  val SPARK_K8S_RESTART_POLICY = CommonVars[String]("linkis.spark.k8s.restartPolicy", "Never")
+  val SPARK_K8S_SPARK_VERSION = CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1")
+  val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace", "default")
+
   val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", "python")
 
   val SPARK_PYTHON_TEST_MODE_ENABLE =
@@ -80,7 +92,7 @@ object SparkConfiguration extends Logging {
     "Map output compression method(map输出结果压缩方式)"
   )
 
-  val SPARK_MASTER = CommonVars[String]("spark.master", "yarn", "Default master(默认master)")
+  val SPARK_MASTER = CommonVars[String]("spark.master", "yarn", "Default yarn(默认yarn)")
 
   val SPARK_CONSOLE_OUTPUT_NUM = CommonVars[Int]("wds.linkis.spark.output.line.limit", 10)
 
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
index 9fb68ed4c..d90b52a21 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
@@ -22,7 +22,7 @@ import org.apache.linkis.engineconn.once.executor.{
   OnceExecutorExecutionContext,
   OperableOnceExecutor
 }
-import org.apache.linkis.engineplugin.spark.client.deployment.YarnApplicationClusterDescriptorAdapter
+import org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter
 import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
   SPARK_APP_CONF,
   SPARK_APPLICATION_ARGS,
@@ -43,7 +43,7 @@ import scala.concurrent.duration.Duration
 class SparkSubmitOnceExecutor(
     override val id: Long,
     override protected val sparkEngineConnContext: SparkEngineConnContext
-) extends SparkOnceExecutor[YarnApplicationClusterDescriptorAdapter]
+) extends SparkOnceExecutor[ClusterDescriptorAdapter]
     with OperableOnceExecutor {
 
   private var oldProgress: Float = 0f
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index fa77e6ed8..fef3f0699 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -94,7 +94,21 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
     val sparkConfig: SparkConfig = new SparkConfig()
     sparkConfig.setJavaHome(variable(Environment.JAVA_HOME))
     sparkConfig.setSparkHome(SPARK_HOME.getValue(options))
-    sparkConfig.setMaster(SPARK_MASTER.getValue(options))
+    val master = SPARK_MASTER.getValue(options)
+    sparkConfig.setMaster(master)
+    if (master.startsWith("k8s")) {
+      sparkConfig.setK8sConfigFile(SPARK_K8S_CONFIG_FILE.getValue(options))
+      sparkConfig.setK8sServiceAccount(SPARK_K8S_SERVICE_ACCOUNT.getValue(options))
+      sparkConfig.setK8sMasterUrl(SPARK_K8S_MASTER_URL.getValue(options))
+      sparkConfig.setK8sUsername(SPARK_K8S_USERNAME.getValue(options))
+      sparkConfig.setK8sPassword(SPARK_K8S_PASSWORD.getValue(options))
+      sparkConfig.setK8sImage(SPARK_K8S_IMAGE.getValue(options))
+      sparkConfig.setK8sNamespace(SPARK_K8S_NAMESPACE.getValue(options))
+      sparkConfig.setK8sSparkVersion(SPARK_K8S_SPARK_VERSION.getValue(options))
+      sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options))
+      sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options))
+      sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options))
+    }
     sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))
     sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
     sparkConfig.setAppName(SPARK_APP_NAME.getValue(options))
diff --git a/pom.xml b/pom.xml
index f0bfa4c32..1a963da20 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,9 @@
     <sqoop.version>1.4.6</sqoop.version>
     <elasticsearch.version>7.6.2</elasticsearch.version>
 
+    <!-- This is the same as the kubernetes-client version referenced in spark3.2.1 -->
+    <kubernetes-client.version>5.4.1</kubernetes-client.version>
+
     <!-- marshalling -->
     <gson.version>2.8.9</gson.version>
     <jackson-bom.version>2.13.4.20221013</jackson-bom.version>
diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt
index 9f83b2721..67641f5c8 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -37,6 +37,7 @@ aws-java-sdk-core-1.11.792.jar
 aws-java-sdk-kms-1.11.792.jar
 aws-java-sdk-s3-1.11.792.jar
 aws-java-sdk-s3-1.12.261.jar
+automaton-1.11-8.jar
 bcpkix-jdk15on-1.70.jar
 bcprov-jdk15on-1.70.jar
 bcutil-jdk15on-1.70.jar
@@ -165,6 +166,7 @@ guice-4.1.0.jar
 guice-4.2.2.jar
 guice-assistedinject-3.0.jar
 guice-servlet-4.0.jar
+generex-1.0.2.jar
 hadoop-aliyun-3.3.4.jar
 hadoop-annotations-2.7.2.jar
 hadoop-annotations-3.3.4.jar
@@ -350,6 +352,27 @@ knife4j-spring-ui-2.0.9.jar
 kotlin-stdlib-1.3.72.jar
 kotlin-stdlib-common-1.3.72.jar
 kryo-2.24.0.jar
+kubernetes-client-5.4.1.jar
+kubernetes-model-admissionregistration-4.13.2.jar
+kubernetes-model-apiextensions-4.13.2.jar
+kubernetes-model-apps-4.13.2.jar
+kubernetes-model-autoscaling-4.13.2.jar
+kubernetes-model-batch-4.13.2.jar
+kubernetes-model-certificates-4.13.2.jar
+kubernetes-model-common-5.4.1.jar
+kubernetes-model-coordination-4.13.2.jar
+kubernetes-model-core-5.4.1.jar
+kubernetes-model-discovery-4.13.2.jar
+kubernetes-model-events-4.13.2.jar
+kubernetes-model-extensions-4.13.2.jar
+kubernetes-model-flowcontrol-5.4.1.jar
+kubernetes-model-metrics-4.13.2.jar
+kubernetes-model-networking-4.13.2.jar
+kubernetes-model-node-4.13.2.jar
+kubernetes-model-policy-4.13.2.jar
+kubernetes-model-rbac-4.13.2.jar
+kubernetes-model-scheduling-4.13.2.jar
+kubernetes-model-storageclass-4.13.2.jar
 libfb303-0.9.3.jar
 libthrift-0.15.0.pom
 libthrift-0.9.3.jar
@@ -363,6 +386,7 @@ log4j-jcl-2.17.2.jar
 log4j-jul-2.17.2.jar
 log4j-slf4j-impl-2.17.2.jar
 log4j-web-2.17.2.jar
+logging-interceptor-3.14.9.jar
 lz4-java-1.6.0.jar
 lz4-java-1.7.1.jar
 mapstruct-1.3.1.Final.jar
@@ -610,4 +634,5 @@ xz-1.5.jar
 zookeeper-3.5.9.jar
 zookeeper-jute-3.5.9.jar
 zstd-jni-1.4.4-7.jar
-zstd-jni-1.4.5-6.jar
\ No newline at end of file
+zstd-jni-1.4.5-6.jar
+zjsonpatch-0.3.0.jar
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org