You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/07/19 09:33:38 UTC
[linkis] branch master updated: Spark supports k8s operator submit task (#4767)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new b14cbf6e7 Spark supports k8s operator submit task (#4767)
b14cbf6e7 is described below
commit b14cbf6e7517d06daa2fad5f4bc42860933ebb69
Author: ChengJie1053 <18...@163.com>
AuthorDate: Wed Jul 19 17:33:32 2023 +0800
Spark supports k8s operator submit task (#4767)
---
linkis-engineconn-plugins/spark/pom.xml | 27 +++
.../spark/client/context/SparkConfig.java | 137 ++++++++++++
.../deployment/ClusterDescriptorAdapter.java | 7 +
.../ClusterDescriptorAdapterFactory.java | 9 +
...KubernetesOperatorClusterDescriptorAdapter.java | 211 ++++++++++++++++++
.../ApplicationState.java} | 25 ++-
.../ConfigMap.java} | 16 +-
.../spark/client/deployment/crds/DriverInfo.java | 58 +++++
.../HostPath.java} | 26 ++-
.../RestartPolicy.java} | 20 +-
.../SparkApplication.java} | 22 +-
.../SparkApplicationList.java} | 13 +-
.../deployment/crds/SparkApplicationSpec.java | 226 +++++++++++++++++++
.../deployment/crds/SparkApplicationStatus.java | 105 +++++++++
.../spark/client/deployment/crds/SparkPodSpec.java | 238 +++++++++++++++++++++
.../Volume.java} | 36 +++-
.../VolumeMount.java} | 26 ++-
.../spark/client/deployment/util/FileUtils.java | 201 +++++++++++++++++
.../client/deployment/util/KubernetesHelper.java | 82 +++++++
.../spark/src/main/resources/spark-k8s-operator.md | 98 +++++++++
.../spark/config/SparkConfiguration.scala | 14 +-
.../spark/executor/SparkSubmitOnceExecutor.scala | 4 +-
.../spark/factory/SparkEngineConnFactory.scala | 16 +-
pom.xml | 3 +
tool/dependencies/known-dependencies.txt | 27 ++-
25 files changed, 1582 insertions(+), 65 deletions(-)
diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml
index e5f8d0b05..add427443 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -392,6 +392,33 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-client</artifactId>
+ <version>${kubernetes-client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-model-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-model-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-model-common</artifactId>
+ <version>${kubernetes-client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-model-core</artifactId>
+ <version>5.4.1</version>
+ </dependency>
+
</dependencies>
<build>
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
index 20b0749d8..4e4654f92 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -25,6 +25,28 @@ public class SparkConfig {
private String javaHome; // ("")
private String sparkHome; // ("")
private String master = "yarn"; // ("yarn")
+
+ private String k8sConfigFile;
+
+ private String k8sServiceAccount;
+
+ private String k8sMasterUrl;
+
+ private String k8sUsername;
+
+ private String k8sPassword;
+
+ private String k8sImage;
+
+ private String k8sImagePullPolicy;
+
+ private String k8sLanguageType;
+
+ private String k8sRestartPolicy;
+
+ private String k8sSparkVersion;
+
+ private String k8sNamespace;
private String deployMode = "client"; // ("client") // todo cluster
private String appResource; // ("")
private String appName; // ("")
@@ -51,6 +73,94 @@ public class SparkConfig {
private String keytab; // ("--keytab", "")
private String queue; // ("--queue", "")
+ public String getK8sImagePullPolicy() {
+ return k8sImagePullPolicy;
+ }
+
+ public void setK8sImagePullPolicy(String k8sImagePullPolicy) {
+ this.k8sImagePullPolicy = k8sImagePullPolicy;
+ }
+
+ public String getK8sLanguageType() {
+ return k8sLanguageType;
+ }
+
+ public void setK8sLanguageType(String k8sLanguageType) {
+ this.k8sLanguageType = k8sLanguageType;
+ }
+
+ public String getK8sRestartPolicy() {
+ return k8sRestartPolicy;
+ }
+
+ public void setK8sRestartPolicy(String k8sRestartPolicy) {
+ this.k8sRestartPolicy = k8sRestartPolicy;
+ }
+
+ public String getK8sSparkVersion() {
+ return k8sSparkVersion;
+ }
+
+ public void setK8sSparkVersion(String k8sSparkVersion) {
+ this.k8sSparkVersion = k8sSparkVersion;
+ }
+
+ public String getK8sNamespace() {
+ return k8sNamespace;
+ }
+
+ public void setK8sNamespace(String k8sNamespace) {
+ this.k8sNamespace = k8sNamespace;
+ }
+
+ public String getK8sMasterUrl() {
+ return k8sMasterUrl;
+ }
+
+ public String getK8sConfigFile() {
+ return k8sConfigFile;
+ }
+
+ public void setK8sConfigFile(String k8sConfigFile) {
+ this.k8sConfigFile = k8sConfigFile;
+ }
+
+ public String getK8sServiceAccount() {
+ return k8sServiceAccount;
+ }
+
+ public void setK8sServiceAccount(String k8sServiceAccount) {
+ this.k8sServiceAccount = k8sServiceAccount;
+ }
+
+ public void setK8sMasterUrl(String k8sMasterUrl) {
+ this.k8sMasterUrl = k8sMasterUrl;
+ }
+
+ public String getK8sUsername() {
+ return k8sUsername;
+ }
+
+ public void setK8sUsername(String k8sUsername) {
+ this.k8sUsername = k8sUsername;
+ }
+
+ public String getK8sPassword() {
+ return k8sPassword;
+ }
+
+ public void setK8sPassword(String k8sPassword) {
+ this.k8sPassword = k8sPassword;
+ }
+
+ public String getK8sImage() {
+ return k8sImage;
+ }
+
+ public void setK8sImage(String k8sImage) {
+ this.k8sImage = k8sImage;
+ }
+
public String getJavaHome() {
return javaHome;
}
@@ -287,6 +397,33 @@ public class SparkConfig {
+ ", master='"
+ master
+ '\''
+ + ", k8sConfigFile='"
+ + k8sConfigFile
+ + '\''
+ + ", k8sServiceAccount='"
+ + k8sServiceAccount
+ + '\''
+ + ", k8sMasterUrl='"
+ + k8sMasterUrl
+ + '\''
+ + ", k8sImage='"
+ + k8sImage
+ + '\''
+ + ", k8sImagePullPolicy='"
+ + k8sImagePullPolicy
+ + '\''
+ + ", k8sLanguageType='"
+ + k8sLanguageType
+ + '\''
+ + ", k8sRestartPolicy='"
+ + k8sRestartPolicy
+ + '\''
+ + ", k8sSparkVersion='"
+ + k8sSparkVersion
+ + '\''
+ + ", k8sNamespace='"
+ + k8sNamespace
+ + '\''
+ ", deployMode='"
+ deployMode
+ '\''
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
index 16b7613cf..1ee44b27c 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
@@ -23,6 +23,8 @@ import org.apache.spark.launcher.CustomSparkSubmitLauncher;
import org.apache.spark.launcher.SparkAppHandle;
import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +60,11 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
return "ClusterDescriptorAdapter{" + "applicationId=" + sparkAppHandle.getAppId() + '}';
}
+ public void deployCluster(String mainClass, String args, Map<String, String> confMap)
+ throws IOException {}
+
+ public abstract boolean initJobId();
+
@Override
public void close() {
if (sparkAppHandle != null) {
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
index 8e9540dcb..91d3eafb6 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
@@ -19,11 +19,20 @@ package org.apache.linkis.engineplugin.spark.client.deployment;
import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import org.apache.commons.lang3.StringUtils;
+
public class ClusterDescriptorAdapterFactory {
public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
+ String master = executionContext.getSparkConfig().getMaster();
+
ClusterDescriptorAdapter clusterDescriptorAdapter =
new YarnApplicationClusterDescriptorAdapter(executionContext);
+
+ if (StringUtils.isNotBlank(master) && master.equalsIgnoreCase("k8s-operator")) {
+ clusterDescriptorAdapter = new KubernetesOperatorClusterDescriptorAdapter(executionContext);
+ }
+
return clusterDescriptorAdapter;
}
}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
new file mode 100644
index 000000000..c77fc655c
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment;
+
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
+import org.apache.linkis.engineplugin.spark.client.deployment.crds.*;
+import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.launcher.SparkAppHandle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition;
+import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+public class KubernetesOperatorClusterDescriptorAdapter extends ClusterDescriptorAdapter {
+
+ protected SparkConfig sparkConfig;
+ protected KubernetesClient client;
+
+ public KubernetesOperatorClusterDescriptorAdapter(ExecutionContext executionContext) {
+ super(executionContext);
+ this.sparkConfig = executionContext.getSparkConfig();
+ this.client =
+ KubernetesHelper.getKubernetesClient(
+ this.sparkConfig.getK8sConfigFile(),
+ this.sparkConfig.getK8sMasterUrl(),
+ this.sparkConfig.getK8sUsername(),
+ this.sparkConfig.getK8sPassword());
+ }
+
+ public void deployCluster(String mainClass, String args, Map<String, String> confMap) {
+
+ CustomResourceDefinitionList crds =
+ client.apiextensions().v1().customResourceDefinitions().list();
+
+ String sparkApplicationCRDName = CustomResource.getCRDName(SparkApplication.class);
+ List<CustomResourceDefinition> sparkCRDList =
+ crds.getItems().stream()
+ .filter(crd -> crd.getMetadata().getName().equals(sparkApplicationCRDName))
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(sparkCRDList)) {
+ throw new RuntimeException("The Spark operator crd does not exist");
+ }
+
+ NonNamespaceOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>>
+ sparkApplicationClient = getSparkApplicationClient(client);
+ SparkApplication sparkApplication =
+ getSparkApplication(sparkConfig.getAppName(), sparkConfig.getK8sNamespace());
+
+ SparkPodSpec driver =
+ SparkPodSpec.Builder()
+ .cores(sparkConfig.getDriverCores())
+ .memory(sparkConfig.getDriverMemory())
+ .serviceAccount(sparkConfig.getK8sServiceAccount())
+ .build();
+ SparkPodSpec executor =
+ SparkPodSpec.Builder()
+ .cores(sparkConfig.getExecutorCores())
+ .instances(sparkConfig.getNumExecutors())
+ .memory(sparkConfig.getExecutorMemory())
+ .build();
+ SparkApplicationSpec sparkApplicationSpec =
+ SparkApplicationSpec.Builder()
+ .type(sparkConfig.getK8sLanguageType())
+ // todo An error occurs when the client mode is used. The cause has not been found
+ .mode("cluster")
+ .image(sparkConfig.getK8sImage())
+ .imagePullPolicy(sparkConfig.getK8sImagePullPolicy())
+ .mainClass(mainClass)
+ .mainApplicationFile(sparkConfig.getAppResource())
+ .sparkVersion(sparkConfig.getK8sSparkVersion())
+ .restartPolicy(new RestartPolicy(sparkConfig.getK8sRestartPolicy()))
+ .driver(driver)
+ .executor(executor)
+ .build();
+
+ sparkApplication.setSpec(sparkApplicationSpec);
+ SparkApplication created = sparkApplicationClient.createOrReplace(sparkApplication);
+
+ // Wait three seconds to get the status
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+
+ }
+
+ SparkApplicationList list = getSparkApplicationClient(client).list();
+
+ List<SparkApplication> sparkApplicationList =
+ list.getItems().stream()
+ .filter(crd -> crd.getMetadata().getName().equals(sparkConfig.getAppName()))
+ .collect(Collectors.toList());
+
+ if (CollectionUtils.isNotEmpty(sparkApplicationList)) {
+ SparkApplicationStatus status = sparkApplicationList.get(0).getStatus();
+ if (Objects.nonNull(status)) {
+ logger.info(
+ "Spark k8s task: {},status: {}",
+ sparkConfig.getAppName(),
+ status.getApplicationState().getState());
+ }
+ }
+ }
+
+ public boolean initJobId() {
+ SparkApplicationStatus sparkApplicationStatus = getKubernetesOperatorState();
+
+ if (Objects.nonNull(sparkApplicationStatus)) {
+ this.applicationId = sparkApplicationStatus.getSparkApplicationId();
+ this.jobState =
+ kubernetesOperatorStateConvertSparkState(
+ sparkApplicationStatus.getApplicationState().getState());
+ }
+
+ // When the job is not finished, the appId is monitored; otherwise, the status is
+ // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待)
+ return null != getApplicationId() || (jobState != null && jobState.isFinal());
+ }
+
+ private SparkApplicationStatus getKubernetesOperatorState() {
+ List<SparkApplication> sparkApplicationList =
+ getSparkApplicationClient(client).list().getItems();
+ if (CollectionUtils.isNotEmpty(sparkApplicationList)) {
+ for (SparkApplication sparkApplication : sparkApplicationList) {
+ if (sparkApplication.getMetadata().getNamespace().equals(this.sparkConfig.getK8sNamespace())
+ && sparkApplication.getMetadata().getName().equals(this.sparkConfig.getAppName())) {
+ return sparkApplication.getStatus();
+ }
+ }
+ }
+ return null;
+ }
+
+ public SparkAppHandle.State kubernetesOperatorStateConvertSparkState(String kubernetesState) {
+ if (StringUtils.isBlank(kubernetesState)) {
+ return SparkAppHandle.State.UNKNOWN;
+ }
+ switch (kubernetesState) {
+ case "PENDING":
+ return SparkAppHandle.State.CONNECTED;
+ case "RUNNING":
+ return SparkAppHandle.State.RUNNING;
+ case "COMPLETED":
+ return SparkAppHandle.State.FINISHED;
+ case "FAILED":
+ return SparkAppHandle.State.FAILED;
+ default:
+ return SparkAppHandle.State.UNKNOWN;
+ }
+ }
+
+ public boolean isDisposed() {
+ return this.jobState.isFinal();
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterDescriptorAdapter{" + "applicationId=" + getApplicationId() + '}';
+ }
+
+ @Override
+ public void close() {
+ logger.info("Start to close job {}.", getApplicationId());
+ SparkApplication SparkApplication =
+ getSparkApplication(this.sparkConfig.getAppName(), this.sparkConfig.getK8sNamespace());
+ getSparkApplicationClient(client).delete(SparkApplication);
+ client.close();
+ }
+
+ public static NonNamespaceOperation<
+ SparkApplication, SparkApplicationList, Resource<SparkApplication>>
+ getSparkApplicationClient(KubernetesClient client) {
+ return client.customResources(SparkApplication.class, SparkApplicationList.class);
+ }
+
+ public static SparkApplication getSparkApplication(String sparkOperatorName, String namespace) {
+ SparkApplication sparkApplication = new SparkApplication();
+ ObjectMeta metadata = new ObjectMeta();
+ metadata.setName(sparkOperatorName);
+ metadata.setNamespace(namespace);
+ sparkApplication.setMetadata(metadata);
+ return sparkApplication;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ApplicationState.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ApplicationState.java
index 8e9540dcb..ac59f75be 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ApplicationState.java
@@ -15,15 +15,26 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class ApplicationState {
-public class ClusterDescriptorAdapterFactory {
+ private String state;
+ private String errorMessage;
- public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
- ClusterDescriptorAdapter clusterDescriptorAdapter =
- new YarnApplicationClusterDescriptorAdapter(executionContext);
- return clusterDescriptorAdapter;
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
}
}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ConfigMap.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ConfigMap.java
index 8e9540dcb..4ea04d7c5 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/ConfigMap.java
@@ -15,15 +15,17 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class ConfigMap {
-public class ClusterDescriptorAdapterFactory {
+ private String name;
- public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
- ClusterDescriptorAdapter clusterDescriptorAdapter =
- new YarnApplicationClusterDescriptorAdapter(executionContext);
- return clusterDescriptorAdapter;
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
}
}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/DriverInfo.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/DriverInfo.java
new file mode 100644
index 000000000..0484e2390
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/DriverInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
+
+public class DriverInfo {
+
+ private String podName;
+ private String webUIAddress;
+ private String webUIPort;
+ private String webUIServiceName;
+
+ public String getWebUIServiceName() {
+ return webUIServiceName;
+ }
+
+ public void setWebUIServiceName(String webUIServiceName) {
+ this.webUIServiceName = webUIServiceName;
+ }
+
+ public String getWebUIPort() {
+ return webUIPort;
+ }
+
+ public void setWebUIPort(String webUIPort) {
+ this.webUIPort = webUIPort;
+ }
+
+ public String getWebUIAddress() {
+ return webUIAddress;
+ }
+
+ public void setWebUIAddress(String webUIAddress) {
+ this.webUIAddress = webUIAddress;
+ }
+
+ public String getPodName() {
+ return podName;
+ }
+
+ public void setPodName(String podName) {
+ this.podName = podName;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/HostPath.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/HostPath.java
index 8e9540dcb..053150352 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/HostPath.java
@@ -15,15 +15,27 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class HostPath {
-public class ClusterDescriptorAdapterFactory {
+ private String path;
- public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
- ClusterDescriptorAdapter clusterDescriptorAdapter =
- new YarnApplicationClusterDescriptorAdapter(executionContext);
- return clusterDescriptorAdapter;
+ private String type;
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
}
}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/RestartPolicy.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/RestartPolicy.java
index 8e9540dcb..276505af8 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/RestartPolicy.java
@@ -15,15 +15,21 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class RestartPolicy {
-public class ClusterDescriptorAdapterFactory {
+ private String type;
- public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
- ClusterDescriptorAdapter clusterDescriptorAdapter =
- new YarnApplicationClusterDescriptorAdapter(executionContext);
- return clusterDescriptorAdapter;
+ public RestartPolicy(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
}
}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplication.java
similarity index 55%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplication.java
index 8e9540dcb..6a0b143a7 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplication.java
@@ -15,15 +15,21 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import io.fabric8.kubernetes.api.model.Namespaced;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.Kind;
+import io.fabric8.kubernetes.model.annotation.Version;
-public class ClusterDescriptorAdapterFactory {
+@Version(SparkApplication.VERSION)
+@Group(SparkApplication.GROUP)
+@Kind(SparkApplication.Kind)
+public class SparkApplication extends CustomResource<SparkApplicationSpec, SparkApplicationStatus>
+ implements Namespaced {
+ public static final String GROUP = "sparkoperator.k8s.io";
+ public static final String VERSION = "v1beta2";
- public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
- ClusterDescriptorAdapter clusterDescriptorAdapter =
- new YarnApplicationClusterDescriptorAdapter(executionContext);
- return clusterDescriptorAdapter;
- }
+ public static final String Kind = "SparkApplication";
}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationList.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationList.java
index 8e9540dcb..4e70f6bc1 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationList.java
@@ -15,15 +15,8 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import io.fabric8.kubernetes.client.CustomResourceList;
-public class ClusterDescriptorAdapterFactory {
-
- public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
- ClusterDescriptorAdapter clusterDescriptorAdapter =
- new YarnApplicationClusterDescriptorAdapter(executionContext);
- return clusterDescriptorAdapter;
- }
-}
+public class SparkApplicationList extends CustomResourceList<SparkApplication> {}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
new file mode 100644
index 000000000..b170233b5
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
+
+import java.util.List;
+
+import io.fabric8.kubernetes.api.model.KubernetesResource;
+
+public class SparkApplicationSpec implements KubernetesResource {
+
+ private String type;
+
+ private String mode;
+
+ private String image;
+
+ private String imagePullPolicy;
+
+ private String mainClass;
+
+ private String mainApplicationFile;
+
+ private String sparkVersion;
+
+ private RestartPolicy restartPolicy;
+
+ private List<Volume> volumes;
+
+ private SparkPodSpec driver;
+
+ private SparkPodSpec executor;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+
+ public String getImage() {
+ return image;
+ }
+
+ public void setImage(String image) {
+ this.image = image;
+ }
+
+ public String getImagePullPolicy() {
+ return imagePullPolicy;
+ }
+
+ public void setImagePullPolicy(String imagePullPolicy) {
+ this.imagePullPolicy = imagePullPolicy;
+ }
+
+ public String getMainClass() {
+ return mainClass;
+ }
+
+ public void setMainClass(String mainClass) {
+ this.mainClass = mainClass;
+ }
+
+ public String getMainApplicationFile() {
+ return mainApplicationFile;
+ }
+
+ public void setMainApplicationFile(String mainApplicationFile) {
+ this.mainApplicationFile = mainApplicationFile;
+ }
+
+ public String getSparkVersion() {
+ return sparkVersion;
+ }
+
+ public void setSparkVersion(String sparkVersion) {
+ this.sparkVersion = sparkVersion;
+ }
+
+ public RestartPolicy getRestartPolicy() {
+ return restartPolicy;
+ }
+
+ public void setRestartPolicy(RestartPolicy restartPolicy) {
+ this.restartPolicy = restartPolicy;
+ }
+
+ public List<Volume> getVolumes() {
+ return volumes;
+ }
+
+ public void setVolumes(List<Volume> volumes) {
+ this.volumes = volumes;
+ }
+
+ public SparkPodSpec getDriver() {
+ return driver;
+ }
+
+ public void setDriver(SparkPodSpec driver) {
+ this.driver = driver;
+ }
+
+ public SparkPodSpec getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(SparkPodSpec executor) {
+ this.executor = executor;
+ }
+
+ public static SparkApplicationSpecBuilder Builder() {
+ return new SparkApplicationSpecBuilder();
+ }
+
+ public static class SparkApplicationSpecBuilder {
+ private String type;
+ private String mode;
+ private String image;
+ private String imagePullPolicy;
+ private String mainClass;
+ private String mainApplicationFile;
+ private String sparkVersion;
+ private RestartPolicy restartPolicy;
+ private List<Volume> volumes;
+ private SparkPodSpec driver;
+ private SparkPodSpec executor;
+
+ private SparkApplicationSpecBuilder() {}
+
+ public SparkApplicationSpecBuilder type(String type) {
+ this.type = type;
+ return this;
+ }
+
+ public SparkApplicationSpecBuilder mode(String mode) {
+ this.mode = mode;
+ return this;
+ }
+
+ public SparkApplicationSpecBuilder image(String image) {
+ this.image = image;
+ return this;
+ }
+
+ public SparkApplicationSpecBuilder imagePullPolicy(String imagePullPolicy) {
+ this.imagePullPolicy = imagePullPolicy;
+ return this;
+ }
+
+ public SparkApplicationSpecBuilder mainClass(String mainClass) {
+ this.mainClass = mainClass;
+ return this;
+ }
+
+ public SparkApplicationSpecBuilder mainApplicationFile(String mainApplicationFile) {
+ this.mainApplicationFile = mainApplicationFile;
+ return this;
+ }
+
+ public SparkApplicationSpecBuilder sparkVersion(String sparkVersion) {
+ this.sparkVersion = sparkVersion;
+ return this;
+ }
+
+ public SparkApplicationSpecBuilder restartPolicy(RestartPolicy restartPolicy) {
+ this.restartPolicy = restartPolicy;
+ return this;
+ }
+
+ public SparkApplicationSpecBuilder volumes(List<Volume> volumes) {
+ this.volumes = volumes;
+ return this;
+ }
+
+ public SparkApplicationSpecBuilder driver(SparkPodSpec driver) {
+ this.driver = driver;
+ return this;
+ }
+
+ public SparkApplicationSpecBuilder executor(SparkPodSpec executor) {
+ this.executor = executor;
+ return this;
+ }
+
+ public SparkApplicationSpec build() {
+ SparkApplicationSpec sparkApplicationSpec = new SparkApplicationSpec();
+ sparkApplicationSpec.type = this.type;
+ sparkApplicationSpec.mainClass = this.mainClass;
+ sparkApplicationSpec.imagePullPolicy = this.imagePullPolicy;
+ sparkApplicationSpec.volumes = this.volumes;
+ sparkApplicationSpec.driver = this.driver;
+ sparkApplicationSpec.sparkVersion = this.sparkVersion;
+ sparkApplicationSpec.mode = this.mode;
+ sparkApplicationSpec.mainApplicationFile = this.mainApplicationFile;
+ sparkApplicationSpec.executor = this.executor;
+ sparkApplicationSpec.image = this.image;
+ sparkApplicationSpec.restartPolicy = this.restartPolicy;
+ return sparkApplicationSpec;
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationStatus.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationStatus.java
new file mode 100644
index 000000000..c7668ea33
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationStatus.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
+
+import java.util.Map;
+
+public class SparkApplicationStatus {
+
+ private String sparkApplicationId;
+ private String terminationTime;
+ private String lastSubmissionAttemptTime;
+ private String submissionID;
+ private ApplicationState applicationState;
+ private Integer executionAttempts;
+ private Integer submissionAttempts;
+ private DriverInfo driverInfo;
+ private Map<String, String> executorState;
+
+ public String getSubmissionID() {
+ return submissionID;
+ }
+
+ public void setSubmissionID(String submissionID) {
+ this.submissionID = submissionID;
+ }
+
+ public Integer getSubmissionAttempts() {
+ return submissionAttempts;
+ }
+
+ public void setSubmissionAttempts(Integer submissionAttempts) {
+ this.submissionAttempts = submissionAttempts;
+ }
+
+ public String getLastSubmissionAttemptTime() {
+ return lastSubmissionAttemptTime;
+ }
+
+ public void setLastSubmissionAttemptTime(String lastSubmissionAttemptTime) {
+ this.lastSubmissionAttemptTime = lastSubmissionAttemptTime;
+ }
+
+ public String getSparkApplicationId() {
+ return sparkApplicationId;
+ }
+
+ public void setSparkApplicationId(String sparkApplicationId) {
+ this.sparkApplicationId = sparkApplicationId;
+ }
+
+ public String getTerminationTime() {
+ return terminationTime;
+ }
+
+ public void setTerminationTime(String terminationTime) {
+ this.terminationTime = terminationTime;
+ }
+
+ public ApplicationState getApplicationState() {
+ return applicationState;
+ }
+
+ public void setApplicationState(ApplicationState applicationState) {
+ this.applicationState = applicationState;
+ }
+
+ public Integer getExecutionAttempts() {
+ return executionAttempts;
+ }
+
+ public void setExecutionAttempts(Integer executionAttempts) {
+ this.executionAttempts = executionAttempts;
+ }
+
+ public DriverInfo getDriverInfo() {
+ return driverInfo;
+ }
+
+ public void setDriverInfo(DriverInfo driverInfo) {
+ this.driverInfo = driverInfo;
+ }
+
+ public Map<String, String> getExecutorState() {
+ return executorState;
+ }
+
+ public void setExecutorState(Map<String, String> executorState) {
+ this.executorState = executorState;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkPodSpec.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkPodSpec.java
new file mode 100644
index 000000000..6c7b281a9
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkPodSpec.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
+
+import java.util.List;
+import java.util.Map;
+
+public class SparkPodSpec {
+
+ private Integer cores;
+
+ private String coreLimit;
+
+ private String memory;
+
+ private Map<String, String> labels;
+
+ private Map<String, String> annotations;
+
+ private String serviceAccount;
+
+ private List<VolumeMount> volumeMounts;
+
+ private Integer instances;
+
+ public Integer getCores() {
+ return cores;
+ }
+
+ public void setCores(Integer cores) {
+ this.cores = cores;
+ }
+
+ public String getCoreLimit() {
+ return coreLimit;
+ }
+
+ public void setCoreLimit(String coreLimit) {
+ this.coreLimit = coreLimit;
+ }
+
+ public String getMemory() {
+ return memory;
+ }
+
+ public void setMemory(String memory) {
+ this.memory = memory;
+ }
+
+ public Map<String, String> getLabels() {
+ return labels;
+ }
+
+ public void setLabels(Map<String, String> labels) {
+ this.labels = labels;
+ }
+
+ public Map<String, String> getAnnotations() {
+ return annotations;
+ }
+
+ public void setAnnotations(Map<String, String> annotations) {
+ this.annotations = annotations;
+ }
+
+ public String getServiceAccount() {
+ return serviceAccount;
+ }
+
+ public void setServiceAccount(String serviceAccount) {
+ this.serviceAccount = serviceAccount;
+ }
+
+ public List<VolumeMount> getVolumeMounts() {
+ return volumeMounts;
+ }
+
+ public void setVolumeMounts(List<VolumeMount> volumeMounts) {
+ this.volumeMounts = volumeMounts;
+ }
+
+ public Integer getInstances() {
+ return instances;
+ }
+
+ public void setInstances(Integer instances) {
+ this.instances = instances;
+ }
+
+ public static SparkPodSpecBuilder Builder() {
+ return new SparkPodSpecBuilder();
+ }
+
+ public static class SparkPodSpecBuilder {
+ private Integer cores;
+ private String coreLimit;
+ private String memory;
+ private Map<String, String> labels;
+ private Map<String, String> annotations;
+ private String serviceAccount;
+ private List<VolumeMount> volumeMounts;
+ private Integer instances;
+
+ public Integer getCores() {
+ return cores;
+ }
+
+ public void setCores(Integer cores) {
+ this.cores = cores;
+ }
+
+ public String getCoreLimit() {
+ return coreLimit;
+ }
+
+ public void setCoreLimit(String coreLimit) {
+ this.coreLimit = coreLimit;
+ }
+
+ public String getMemory() {
+ return memory;
+ }
+
+ public void setMemory(String memory) {
+ this.memory = memory;
+ }
+
+ public Map<String, String> getLabels() {
+ return labels;
+ }
+
+ public void setLabels(Map<String, String> labels) {
+ this.labels = labels;
+ }
+
+ public Map<String, String> getAnnotations() {
+ return annotations;
+ }
+
+ public void setAnnotations(Map<String, String> annotations) {
+ this.annotations = annotations;
+ }
+
+ public String getServiceAccount() {
+ return serviceAccount;
+ }
+
+ public void setServiceAccount(String serviceAccount) {
+ this.serviceAccount = serviceAccount;
+ }
+
+ public List<VolumeMount> getVolumeMounts() {
+ return volumeMounts;
+ }
+
+ public void setVolumeMounts(List<VolumeMount> volumeMounts) {
+ this.volumeMounts = volumeMounts;
+ }
+
+ public Integer getInstances() {
+ return instances;
+ }
+
+ public void setInstances(Integer instances) {
+ this.instances = instances;
+ }
+
+ private SparkPodSpecBuilder() {}
+
+ public SparkPodSpecBuilder cores(Integer cores) {
+ this.cores = cores;
+ return this;
+ }
+
+ public SparkPodSpecBuilder coreLimit(String coreLimit) {
+ this.coreLimit = coreLimit;
+ return this;
+ }
+
+ public SparkPodSpecBuilder memory(String memory) {
+ this.memory = memory;
+ return this;
+ }
+
+ public SparkPodSpecBuilder labels(Map<String, String> labels) {
+ this.labels = labels;
+ return this;
+ }
+
+ public SparkPodSpecBuilder annotations(Map<String, String> annotations) {
+ this.annotations = annotations;
+ return this;
+ }
+
+ public SparkPodSpecBuilder serviceAccount(String serviceAccount) {
+ this.serviceAccount = serviceAccount;
+ return this;
+ }
+
+ public SparkPodSpecBuilder volumeMounts(List<VolumeMount> volumeMounts) {
+ this.volumeMounts = volumeMounts;
+ return this;
+ }
+
+ public SparkPodSpecBuilder instances(Integer instances) {
+ this.instances = instances;
+ return this;
+ }
+
+ public SparkPodSpec build() {
+ SparkPodSpec sparkPodSpec = new SparkPodSpec();
+ sparkPodSpec.annotations = this.annotations;
+ sparkPodSpec.coreLimit = this.coreLimit;
+ sparkPodSpec.instances = this.instances;
+ sparkPodSpec.labels = this.labels;
+ sparkPodSpec.serviceAccount = this.serviceAccount;
+ sparkPodSpec.cores = this.cores;
+ sparkPodSpec.memory = this.memory;
+ sparkPodSpec.volumeMounts = this.volumeMounts;
+ return sparkPodSpec;
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/Volume.java
similarity index 58%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/Volume.java
index 8e9540dcb..4410fe80f 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/Volume.java
@@ -15,15 +15,37 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class Volume {
-public class ClusterDescriptorAdapterFactory {
+ private String name;
- public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
- ClusterDescriptorAdapter clusterDescriptorAdapter =
- new YarnApplicationClusterDescriptorAdapter(executionContext);
- return clusterDescriptorAdapter;
+ private HostPath hostPath;
+
+ private ConfigMap configMap;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public HostPath getHostPath() {
+ return hostPath;
+ }
+
+ public void setHostPath(HostPath hostPath) {
+ this.hostPath = hostPath;
+ }
+
+ public ConfigMap getConfigMap() {
+ return configMap;
+ }
+
+ public void setConfigMap(ConfigMap configMap) {
+ this.configMap = configMap;
}
}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/VolumeMount.java
similarity index 64%
copy from linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
copy to linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/VolumeMount.java
index 8e9540dcb..a4b2fb70c 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/VolumeMount.java
@@ -15,15 +15,27 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.client.deployment;
+package org.apache.linkis.engineplugin.spark.client.deployment.crds;
-import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+public class VolumeMount {
-public class ClusterDescriptorAdapterFactory {
+ private String name;
- public static ClusterDescriptorAdapter create(ExecutionContext executionContext) {
- ClusterDescriptorAdapter clusterDescriptorAdapter =
- new YarnApplicationClusterDescriptorAdapter(executionContext);
- return clusterDescriptorAdapter;
+ private String mountPath;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getMountPath() {
+ return mountPath;
+ }
+
+ public void setMountPath(String mountPath) {
+ this.mountPath = mountPath;
}
}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/FileUtils.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/FileUtils.java
new file mode 100644
index 000000000..2b311b91b
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/FileUtils.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.stream.Stream;
+
+/**
+ * This is a utility class to deal files and directories. Contains utilities for recursive deletion
+ * and creation of temporary files.
+ */
+public final class FileUtils {
+
+ /** Global lock to prevent concurrent directory deletes under Windows and MacOS. */
+ private static final Object DELETE_LOCK = new Object();
+
+ /** The alphabet to construct the random part of the filename from. */
+ private static final char[] ALPHABET = {
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f'
+ };
+
+ /** The length of the random part of the filename. */
+ private static final int RANDOM_FILE_NAME_LENGTH = 12;
+
+ /**
+ * The maximum size of array to allocate for reading. See {@code MAX_BUFFER_SIZE} in {@link Files}
+ * for more.
+ */
+ private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
+
+ /** The size of the buffer used for reading. */
+ private static final int BUFFER_SIZE = 4096;
+
+ private static final String JAR_FILE_EXTENSION = "jar";
+
+ public static final String CLASS_FILE_EXTENSION = "class";
+
+ public static final String PACKAGE_SEPARATOR = ".";
+
+ // ------------------------------------------------------------------------
+
+ public static void writeCompletely(WritableByteChannel channel, ByteBuffer src)
+ throws IOException {
+ while (src.hasRemaining()) {
+ channel.write(src);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Lists the given directory in a resource-leak-safe way. */
+ public static Path[] listDirectory(Path directory) throws IOException {
+ try (Stream<Path> stream = Files.list(directory)) {
+ return stream.toArray(Path[]::new);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Constructs a random filename with the given prefix and a random part generated from hex
+ * characters.
+ *
+ * @param prefix the prefix to the filename to be constructed
+ * @return the generated random filename with the given prefix
+ */
+ public static String getRandomFilename(final String prefix) {
+ final Random rnd = new Random();
+ final StringBuilder stringBuilder = new StringBuilder(prefix);
+
+ for (int i = 0; i < RANDOM_FILE_NAME_LENGTH; i++) {
+ stringBuilder.append(ALPHABET[rnd.nextInt(ALPHABET.length)]);
+ }
+
+ return stringBuilder.toString();
+ }
+
+ // ------------------------------------------------------------------------
+ // Simple reading and writing of files
+ // ------------------------------------------------------------------------
+
+ public static String readFile(File file, String charsetName) throws IOException {
+ byte[] bytes = readAllBytes(file.toPath());
+ return new String(bytes, charsetName);
+ }
+
+ public static String readFileUtf8(File file) throws IOException {
+ return readFile(file, "UTF-8");
+ }
+
+ public static void writeFile(File file, String contents, String encoding) throws IOException {
+ byte[] bytes = contents.getBytes(encoding);
+ Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
+ }
+
+ public static void writeFileUtf8(File file, String contents) throws IOException {
+ writeFile(file, contents, "UTF-8");
+ }
+
+ /**
+ * Reads all the bytes from a file. The method ensures that the file is closed when all bytes have
+ * been read or an I/O error, or other runtime exception, is thrown.
+ *
+ * <p>This is an implementation that follow {@link Files#readAllBytes(Path)}, and the difference
+ * is that it limits the size of the direct buffer to avoid direct-buffer OutOfMemoryError. When
+ * {@link Files#readAllBytes(Path)} or other interfaces in java API can do this in the future, we
+ * should remove it.
+ *
+ * @param path the path to the file
+ * @return a byte array containing the bytes read from the file
+ * @throws IOException if an I/O error occurs reading from the stream
+ * @throws OutOfMemoryError if an array of the required size cannot be allocated, for example the
+ * file is larger that {@code 2GB}
+ */
+ public static byte[] readAllBytes(Path path) throws IOException {
+ try (SeekableByteChannel channel = Files.newByteChannel(path);
+ InputStream in = Channels.newInputStream(channel)) {
+
+ long size = channel.size();
+ if (size > (long) MAX_BUFFER_SIZE) {
+ throw new OutOfMemoryError("Required array size too large");
+ }
+
+ return read(in, (int) size);
+ }
+ }
+
+ /**
+ * Reads all the bytes from an input stream. Uses {@code initialSize} as a hint about how many
+ * bytes the stream will have and uses {@code directBufferSize} to limit the size of the direct
+ * buffer used to read.
+ *
+ * @param source the input stream to read from
+ * @param initialSize the initial size of the byte array to allocate
+ * @return a byte array containing the bytes read from the file
+ * @throws IOException if an I/O error occurs reading from the stream
+ * @throws OutOfMemoryError if an array of the required size cannot be allocated
+ */
+ private static byte[] read(InputStream source, int initialSize) throws IOException {
+ int capacity = initialSize;
+ byte[] buf = new byte[capacity];
+ int nread = 0;
+ int n;
+
+ for (; ; ) {
+ // read to EOF which may read more or less than initialSize (eg: file
+ // is truncated while we are reading)
+ while ((n = source.read(buf, nread, Math.min(capacity - nread, BUFFER_SIZE))) > 0) {
+ nread += n;
+ }
+
+ // if last call to source.read() returned -1, we are done
+ // otherwise, try to read one more byte; if that failed we're done too
+ if (n < 0 || (n = source.read()) < 0) {
+ break;
+ }
+
+ // one more byte was read; need to allocate a larger buffer
+ if (capacity <= MAX_BUFFER_SIZE - capacity) {
+ capacity = Math.max(capacity << 1, BUFFER_SIZE);
+ } else {
+ if (capacity == MAX_BUFFER_SIZE) {
+ throw new OutOfMemoryError("Required array size too large");
+ }
+ capacity = MAX_BUFFER_SIZE;
+ }
+ buf = Arrays.copyOf(buf, capacity);
+ buf[nread++] = (byte) n;
+ }
+ return (capacity == nread) ? buf : Arrays.copyOf(buf, nread);
+ }
+
+ /** Private default constructor to avoid instantiation. */
+ private FileUtils() {}
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/KubernetesHelper.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/KubernetesHelper.java
new file mode 100644
index 000000000..04b82a843
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/util/KubernetesHelper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+
+import io.fabric8.kubernetes.client.*;
+
+public class KubernetesHelper {
+
+ public static KubernetesClient getKubernetesClientByUrl(
+ String k8sMasterUrl, String k8sUsername, String k8sPassword) {
+ Config config =
+ new ConfigBuilder()
+ .withMasterUrl(k8sMasterUrl)
+ .withUsername(k8sUsername)
+ .withPassword(k8sPassword)
+ .build();
+ return new DefaultKubernetesClient(config);
+ }
+
+ public static KubernetesClient getKubernetesClientByUrl(String k8sMasterUrl) {
+ Config config = new ConfigBuilder().withMasterUrl(k8sMasterUrl).build();
+ return new DefaultKubernetesClient(config);
+ }
+
+ public static KubernetesClient getKubernetesClient(
+ String kubeConfigFile, String k8sMasterUrl, String k8sUsername, String k8sPassword) {
+ // The ConfigFile mode is preferred
+ if (StringUtils.isNotBlank(kubeConfigFile)) {
+ return getKubernetesClientByKubeConfigFile(kubeConfigFile);
+ }
+
+ if (StringUtils.isNotBlank(k8sMasterUrl)
+ && StringUtils.isNotBlank(k8sUsername)
+ && StringUtils.isNotBlank(k8sPassword)) {
+ return getKubernetesClientByUrl(k8sMasterUrl, k8sUsername, k8sPassword);
+ }
+
+ if (StringUtils.isNotBlank(k8sMasterUrl)) {
+ return getKubernetesClientByUrl(k8sMasterUrl);
+ }
+
+ throw new KubernetesClientException(
+ "Both kubeConfigFile and k8sMasterUrl are empty. Initializing KubernetesClient failed.");
+ }
+
+ public static KubernetesClient getKubernetesClientByKubeConfigFile(String kubeConfigFile) {
+ final Config config;
+
+ if (kubeConfigFile != null) {
+ try {
+ config =
+ Config.fromKubeconfig(null, FileUtils.readFileUtf8(new File(kubeConfigFile)), null);
+ } catch (IOException e) {
+ throw new KubernetesClientException("Load kubernetes config failed.", e);
+ }
+ } else {
+ config = Config.autoConfigure(null);
+ }
+
+ return new DefaultKubernetesClient(config);
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/resources/spark-k8s-operator.md b/linkis-engineconn-plugins/spark/src/main/resources/spark-k8s-operator.md
new file mode 100644
index 000000000..542fbceb9
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/resources/spark-k8s-operator.md
@@ -0,0 +1,98 @@
+
+### 1. spark-on-k8s-operator document
+
+```text
+https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/quick-start-guide.md
+```
+
+
+### 2. spark-on-k8s-operator install
+
+```text
+helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
+
+helm install my-release spark-operator/spark-operator --namespace spark-operator --create-namespace --set webhook.enable=true
+```
+
+### 3. spark-on-k8s-operator test task submit
+
+```text
+kubectl apply -f examples/spark-pi.yaml
+```
+
+### 4. If an error is reported: Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. pods "spark-pi-driver" is forbidden: error looking up service account spark/spark: serviceaccount "spark" not found.
+
+```text
+kubectl create serviceaccount spark
+
+kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
+```
+
+### 5. spark-on-k8s-operator Uninstall (usually not required, uninstall after installation problems)
+
+```text
+helm uninstall my-release --namespace spark-operator
+
+kubectl delete serviceaccounts my-release-spark-operator --namespace spark-operator
+
+kubectl delete clusterrole my-release-spark-operator --namespace spark-operator
+
+kubectl delete clusterrolebindings my-release-spark-operator --namespace spark-operator
+```
+
+### 6. Submitting tasks with Restful API
+```text
+POST /api/rest_j/v1/entrance/submit
+```
+
+```json
+{
+ "executionContent": {
+ "spark.app.main.class": "org.apache.spark.examples.SparkPi",
+ "spark.app.args": "spark.app.args",
+ "runType": "jar",
+ "code": "show databases"
+ },
+ "params": {
+ "variable": {
+ },
+ "configuration": {
+ "startup": {
+ "spark.executor.memory": "1g",
+ "spark.driver.memory": "1g",
+ "spark.executor.cores": "1",
+ "spark.app.name": "spark-submit-jar-cjtest",
+ "spark.app.resource": "local:///opt/spark/examples/jars/spark-examples_2.12-3.2.1.jar",
+ "spark.executor.instances": 1,
+ "spark.master": "k8soperator",
+ "spark.k8s.master.url": "http://ip:port",
+ "spark.k8s.username": "username",
+ "spark.k8s.password": "password"
+ }
+ }
+ },
+ "source": {
+ "scriptPath": "file:///tmp/hadoop/test.sql"
+ },
+ "labels": {
+ "engineType": "spark-3.2.1",
+ "engineConnMode": "once",
+ "userCreator": "linkis-IDE"
+ }
+}
+```
+
+### 7. Reference document
+```text
+https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
+
+https://github.com/fabric8io/kubernetes-client/
+
+https://github.com/apple/batch-processing-gateway
+
+https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-introduction-spark-submit-kubernetes-operator
+
+https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-deep-dive-kubernetes-operator-for-spark
+```
+
+
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index dc851c6fb..eb4bbdaa1 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -50,6 +50,18 @@ object SparkConfiguration extends Logging {
val SPARK_APP_RESOURCE = CommonVars[String]("spark.app.resource", "")
val SPARK_APP_CONF = CommonVars[String]("spark.extconf", "")
+ val SPARK_K8S_CONFIG_FILE = CommonVars[String]("linkis.spark.k8s.config.file", "")
+ val SPARK_K8S_SERVICE_ACCOUNT = CommonVars[String]("linkis.spark.k8s.serviceAccount", "")
+ val SPARK_K8S_MASTER_URL = CommonVars[String]("linkis.spark.k8s.master.url", "")
+ val SPARK_K8S_USERNAME = CommonVars[String]("linkis.spark.k8s.username", "")
+ val SPARK_K8S_PASSWORD = CommonVars[String]("linkis.spark.k8s.password", "")
+ val SPARK_K8S_IMAGE = CommonVars[String]("linkis.spark.k8s.image", "apache/spark:v3.2.1")
+ val SPARK_K8S_IMAGE_PULL_POLICY = CommonVars[String]("linkis.spark.k8s.imagePullPolicy", "Always")
+ val SPARK_K8S_LANGUAGE_TYPE = CommonVars[String]("linkis.spark.k8s.languageType", "Scala")
+ val SPARK_K8S_RESTART_POLICY = CommonVars[String]("linkis.spark.k8s.restartPolicy", "Never")
+ val SPARK_K8S_SPARK_VERSION = CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1")
+ val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace", "default")
+
val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", "python")
val SPARK_PYTHON_TEST_MODE_ENABLE =
@@ -80,7 +92,7 @@ object SparkConfiguration extends Logging {
"Map output compression method(map输出结果压缩方式)"
)
- val SPARK_MASTER = CommonVars[String]("spark.master", "yarn", "Default master(默认master)")
+ val SPARK_MASTER = CommonVars[String]("spark.master", "yarn", "Default yarn(默认yarn)")
val SPARK_CONSOLE_OUTPUT_NUM = CommonVars[Int]("wds.linkis.spark.output.line.limit", 10)
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
index 9fb68ed4c..d90b52a21 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
@@ -22,7 +22,7 @@ import org.apache.linkis.engineconn.once.executor.{
OnceExecutorExecutionContext,
OperableOnceExecutor
}
-import org.apache.linkis.engineplugin.spark.client.deployment.YarnApplicationClusterDescriptorAdapter
+import org.apache.linkis.engineplugin.spark.client.deployment.ClusterDescriptorAdapter
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
SPARK_APP_CONF,
SPARK_APPLICATION_ARGS,
@@ -43,7 +43,7 @@ import scala.concurrent.duration.Duration
class SparkSubmitOnceExecutor(
override val id: Long,
override protected val sparkEngineConnContext: SparkEngineConnContext
-) extends SparkOnceExecutor[YarnApplicationClusterDescriptorAdapter]
+) extends SparkOnceExecutor[ClusterDescriptorAdapter]
with OperableOnceExecutor {
private var oldProgress: Float = 0f
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index fa77e6ed8..fef3f0699 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -94,7 +94,21 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
val sparkConfig: SparkConfig = new SparkConfig()
sparkConfig.setJavaHome(variable(Environment.JAVA_HOME))
sparkConfig.setSparkHome(SPARK_HOME.getValue(options))
- sparkConfig.setMaster(SPARK_MASTER.getValue(options))
+ val master = SPARK_MASTER.getValue(options)
+ sparkConfig.setMaster(master)
+ if (master.startsWith("k8s")) {
+ sparkConfig.setK8sConfigFile(SPARK_K8S_CONFIG_FILE.getValue(options))
+ sparkConfig.setK8sServiceAccount(SPARK_K8S_SERVICE_ACCOUNT.getValue(options))
+ sparkConfig.setK8sMasterUrl(SPARK_K8S_MASTER_URL.getValue(options))
+ sparkConfig.setK8sUsername(SPARK_K8S_USERNAME.getValue(options))
+ sparkConfig.setK8sPassword(SPARK_K8S_PASSWORD.getValue(options))
+ sparkConfig.setK8sImage(SPARK_K8S_IMAGE.getValue(options))
+ sparkConfig.setK8sNamespace(SPARK_K8S_NAMESPACE.getValue(options))
+ sparkConfig.setK8sSparkVersion(SPARK_K8S_SPARK_VERSION.getValue(options))
+ sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options))
+ sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options))
+ sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options))
+ }
sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))
sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
sparkConfig.setAppName(SPARK_APP_NAME.getValue(options))
diff --git a/pom.xml b/pom.xml
index f0bfa4c32..1a963da20 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,9 @@
<sqoop.version>1.4.6</sqoop.version>
<elasticsearch.version>7.6.2</elasticsearch.version>
+ <!-- This is the same as the kubernetes-client version referenced in spark3.2.1 -->
+ <kubernetes-client.version>5.4.1</kubernetes-client.version>
+
<!-- marshalling -->
<gson.version>2.8.9</gson.version>
<jackson-bom.version>2.13.4.20221013</jackson-bom.version>
diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt
index 9f83b2721..67641f5c8 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -37,6 +37,7 @@ aws-java-sdk-core-1.11.792.jar
aws-java-sdk-kms-1.11.792.jar
aws-java-sdk-s3-1.11.792.jar
aws-java-sdk-s3-1.12.261.jar
+automaton-1.11-8.jar
bcpkix-jdk15on-1.70.jar
bcprov-jdk15on-1.70.jar
bcutil-jdk15on-1.70.jar
@@ -165,6 +166,7 @@ guice-4.1.0.jar
guice-4.2.2.jar
guice-assistedinject-3.0.jar
guice-servlet-4.0.jar
+generex-1.0.2.jar
hadoop-aliyun-3.3.4.jar
hadoop-annotations-2.7.2.jar
hadoop-annotations-3.3.4.jar
@@ -350,6 +352,27 @@ knife4j-spring-ui-2.0.9.jar
kotlin-stdlib-1.3.72.jar
kotlin-stdlib-common-1.3.72.jar
kryo-2.24.0.jar
+kubernetes-client-5.4.1.jar
+kubernetes-model-admissionregistration-4.13.2.jar
+kubernetes-model-apiextensions-4.13.2.jar
+kubernetes-model-apps-4.13.2.jar
+kubernetes-model-autoscaling-4.13.2.jar
+kubernetes-model-batch-4.13.2.jar
+kubernetes-model-certificates-4.13.2.jar
+kubernetes-model-common-5.4.1.jar
+kubernetes-model-coordination-4.13.2.jar
+kubernetes-model-core-5.4.1.jar
+kubernetes-model-discovery-4.13.2.jar
+kubernetes-model-events-4.13.2.jar
+kubernetes-model-extensions-4.13.2.jar
+kubernetes-model-flowcontrol-5.4.1.jar
+kubernetes-model-metrics-4.13.2.jar
+kubernetes-model-networking-4.13.2.jar
+kubernetes-model-node-4.13.2.jar
+kubernetes-model-policy-4.13.2.jar
+kubernetes-model-rbac-4.13.2.jar
+kubernetes-model-scheduling-4.13.2.jar
+kubernetes-model-storageclass-4.13.2.jar
libfb303-0.9.3.jar
libthrift-0.15.0.pom
libthrift-0.9.3.jar
@@ -363,6 +386,7 @@ log4j-jcl-2.17.2.jar
log4j-jul-2.17.2.jar
log4j-slf4j-impl-2.17.2.jar
log4j-web-2.17.2.jar
+logging-interceptor-3.14.9.jar
lz4-java-1.6.0.jar
lz4-java-1.7.1.jar
mapstruct-1.3.1.Final.jar
@@ -610,4 +634,5 @@ xz-1.5.jar
zookeeper-3.5.9.jar
zookeeper-jute-3.5.9.jar
zstd-jni-1.4.4-7.jar
-zstd-jni-1.4.5-6.jar
\ No newline at end of file
+zstd-jni-1.4.5-6.jar
+zjsonpatch-0.3.0.jar
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org