You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2023/04/25 05:10:53 UTC
[incubator-streampark] 01/01: [Improve]Ingress uses different versions of API according to different K8s versions.
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch ingressv1
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit f6e7b6b9257039f47251e5008ad2e560fe0e4af8
Author: monster <60...@users.noreply.github.com>
AuthorDate: Tue Apr 25 13:10:41 2023 +0800
[Improve]Ingress uses different versions of API according to different K8s versions.
---
.../core/service/impl/ApplicationServiceImpl.java | 2 +-
.../flink/kubernetes/IngressController.scala | 230 ---------------------
.../flink/kubernetes/KubernetesRetriever.scala | 1 +
.../kubernetes/ingress/BaseIngressStrategy.scala | 123 +++++++++++
.../kubernetes/ingress/IngressController.scala | 55 +++++
.../flink/kubernetes/ingress/IngressStrategy.scala | 30 +++
.../kubernetes/ingress/IngressStrategyV1.scala | 98 +++++++++
.../ingress/IngressStrategyV1beta1.scala | 92 +++++++++
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 3 +-
.../flink/kubernetes/FlinkRestJsonTest.scala | 3 +-
.../impl/FlinkK8sApplicationBuildPipeline.scala | 3 +-
11 files changed, 406 insertions(+), 234 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 3fbee60fa..2242b897e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -86,8 +86,8 @@ import org.apache.streampark.flink.client.bean.SubmitRequest;
import org.apache.streampark.flink.client.bean.SubmitResponse;
import org.apache.streampark.flink.core.conf.ParameterCli;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
-import org.apache.streampark.flink.kubernetes.IngressController;
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
+import org.apache.streampark.flink.kubernetes.ingress.IngressController;
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
deleted file mode 100644
index fd758f4d3..000000000
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.kubernetes
-
-import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.util.Utils._
-
-import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
-import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
-import org.apache.commons.io.FileUtils
-import org.apache.flink.client.program.ClusterClient
-import org.json4s.{DefaultFormats, JArray}
-import org.json4s.jackson.JsonMethods.parse
-
-import java.io.File
-import java.io.IOException
-import java.nio.file.Files
-import java.nio.file.Paths
-
-import scala.collection.JavaConverters._
-import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
-
-object IngressController extends Logger {
-
- def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = {
- Try(new DefaultKubernetesClient) match {
- case Success(client) =>
- val annotMap = Map[String, String](
- "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
- "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
- "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + clusterId + ")$ $1/ permanent;")
- )
- val labelsMap = Map[String, String](
- "app" -> clusterId,
- "type" -> "flink-native-kubernetes",
- "component" -> "ingress")
-
- val deployment = client
- .apps()
- .deployments()
- .inNamespace(nameSpace)
- .withName(clusterId)
- .get()
-
- val deploymentUid = if (deployment != null) {
- deployment.getMetadata.getUid
- } else {
- throw new RuntimeException(
- s"Deployment with name $clusterId not found in namespace $nameSpace")
- }
-
- // Create OwnerReference object
- val ownerReference = new OwnerReferenceBuilder()
- .withApiVersion("apps/v1")
- .withKind("Deployment")
- .withName(clusterId)
- .withUid(deploymentUid)
- .withController(true)
- .withBlockOwnerDeletion(true)
- .build()
-
- val ingress = new IngressBuilder()
- .withNewMetadata()
- .withName(clusterId)
- .addToAnnotations(annotMap.asJava)
- .addToLabels(labelsMap.asJava)
- .addToOwnerReferences(ownerReference) // Add OwnerReference
- .endMetadata()
- .withNewSpec()
- .addNewRule()
- .withHost(domainName)
- .withNewHttp()
- .addNewPath()
- .withPath(s"/$nameSpace/$clusterId/")
- .withPathType("ImplementationSpecific")
- .withNewBackend()
- .withNewService()
- .withName(s"$clusterId-rest")
- .withNewPort()
- .withName("rest")
- .endPort()
- .endService()
- .endBackend()
- .endPath()
- .addNewPath()
- .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
- .withPathType("ImplementationSpecific")
- .withNewBackend()
- .withNewService()
- .withName(s"$clusterId-rest")
- .withNewPort()
- .withName("rest")
- .endPort()
- .endService()
- .endBackend()
- .endPath()
- .endHttp()
- .endRule()
- .endSpec()
- .build();
- client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
-
- case _ =>
- }
- }
-
- def configureIngress(ingressOutput: String): Unit = {
- close {
- val client = new DefaultKubernetesClient
- client.network.ingress
- .load(Files.newInputStream(Paths.get(ingressOutput)))
- .get()
- client
- }
- }
-
- private[this] def determineThePodSurvivalStatus(name: String, nameSpace: String): Boolean = {
- tryWithResource(KubernetesRetriever.newK8sClient()) {
- client =>
- Try {
- client
- .apps()
- .deployments()
- .inNamespace(nameSpace)
- .withName(name)
- .get()
- .getSpec()
- .getSelector()
- .getMatchLabels()
- false
- }.getOrElse(true)
- }
- }
-
- def ingressUrlAddress(
- nameSpace: String,
- clusterId: String,
- clusterClient: ClusterClient[_]): String = {
- val client = new DefaultKubernetesClient
- // for kubernetes 1.22+
- lazy val fromV1 =
- Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
- .map(ingress => ingress.getSpec.getRules.get(0))
- .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- // for kubernetes 1.22-
- lazy val fromV1beta1 =
- Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
- .map(ingress => ingress.getSpec.getRules.get(0))
- .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- Try(
- fromV1
- .orElse(fromV1beta1)
- .map { case (host, path) => s"https://$host$path" }
- .getOrElse(clusterClient.getWebInterfaceURL)
- ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress error."))
- }
-
- @throws[IOException]
- def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String = {
- val workspaceDir = new File(buildWorkspace)
- if (!workspaceDir.exists) workspaceDir.mkdir
- if (ingressTemplates.isEmpty) null;
- else {
- val outputPath = buildWorkspace + "/ingress.yaml"
- val outputFile = new File(outputPath)
- FileUtils.write(outputFile, ingressTemplates, "UTF-8")
- outputPath
- }
- }
-
-}
-
-case class IngressMeta(
- addresses: List[String],
- port: Integer,
- protocol: String,
- serviceName: String,
- ingressName: String,
- hostname: String,
- path: String,
- allNodes: Boolean)
-
-object IngressMeta {
-
- @transient implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
-
- def as(json: String): Option[List[IngressMeta]] = {
- Try(parse(json)) match {
- case Success(ok) =>
- ok match {
- case JArray(arr) =>
- val list = arr.map(
- x => {
- IngressMeta(
- addresses =
- (x \ "addresses").extractOpt[List[String]].getOrElse(List.empty[String]),
- port = (x \ "port").extractOpt[Integer].getOrElse(0),
- protocol = (x \ "protocol").extractOpt[String].getOrElse(null),
- serviceName = (x \ "serviceName").extractOpt[String].getOrElse(null),
- ingressName = (x \ "ingressName").extractOpt[String].getOrElse(null),
- hostname = (x \ "hostname").extractOpt[String].getOrElse(null),
- path = (x \ "path").extractOpt[String].getOrElse(null),
- allNodes = (x \ "allNodes").extractOpt[Boolean].getOrElse(false)
- )
- })
- Some(list)
- case _ => None
- }
- case Failure(_) => None
- }
- }
-
-}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index b2808fc1c..d4e6ec0ed 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -20,6 +20,7 @@ package org.apache.streampark.flink.kubernetes
import org.apache.streampark.common.util.{Logger, Utils}
import org.apache.streampark.common.util.Utils.tryWithResource
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.kubernetes.model.ClusterKey
import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient, KubernetesClientException}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/BaseIngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/BaseIngressStrategy.scala
new file mode 100644
index 000000000..0a0e456d0
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/BaseIngressStrategy.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import org.apache.streampark.common.util.Utils._
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.commons.io.FileUtils
+import org.apache.flink.client.program.ClusterClient
+import org.json4s.{DefaultFormats, JArray}
+import org.json4s.jackson.JsonMethods.parse
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+
+abstract class BaseIngressStrategy extends IngressStrategy {
+ override def ingressUrlAddress(
+ nameSpace: String,
+ clusterId: String,
+ clusterClient: ClusterClient[_]): String = {
+ val client = new DefaultKubernetesClient
+ // for kubernetes 1.19+
+ lazy val fromV1 =
+ Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+ .map(ingress => ingress.getSpec.getRules.get(0))
+ .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
+ // for kubernetes 1.19-
+ lazy val fromV1beta1 =
+ Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+ .map(ingress => ingress.getSpec.getRules.get(0))
+ .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
+ Try(
+ fromV1
+ .orElse(fromV1beta1)
+ .map { case (host, path) => s"https://$host$path" }
+ .getOrElse(clusterClient.getWebInterfaceURL)
+ ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress error."))
+ }
+
+ override def prepareIngressTemplateFiles(
+ buildWorkspace: String,
+ ingressTemplates: String): String = {
+ val workspaceDir = new File(buildWorkspace)
+ if (!workspaceDir.exists) workspaceDir.mkdir
+ if (ingressTemplates.isEmpty) null
+ else {
+ val outputPath = buildWorkspace + "/ingress.yaml"
+ val outputFile = new File(outputPath)
+ FileUtils.write(outputFile, ingressTemplates, "UTF-8")
+ outputPath
+ }
+ }
+
+ def configureIngress(ingressOutput: String): Unit = {
+ close {
+ val client = new DefaultKubernetesClient
+ client.network.ingress
+ .load(Files.newInputStream(Paths.get(ingressOutput)))
+ .get()
+ client
+ }
+ }
+
+ case class IngressMeta(
+ addresses: List[String],
+ port: Integer,
+ protocol: String,
+ serviceName: String,
+ ingressName: String,
+ hostname: String,
+ path: String,
+ allNodes: Boolean)
+
+ object IngressMeta {
+
+ @transient implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+
+ def as(json: String): Option[List[IngressMeta]] = {
+ Try(parse(json)) match {
+ case Success(ok) =>
+ ok match {
+ case JArray(arr) =>
+ val list = arr.map(
+ x => {
+ IngressMeta(
+ addresses =
+ (x \ "addresses").extractOpt[List[String]].getOrElse(List.empty[String]),
+ port = (x \ "port").extractOpt[Integer].getOrElse(0),
+ protocol = (x \ "protocol").extractOpt[String].orNull,
+ serviceName = (x \ "serviceName").extractOpt[String].orNull,
+ ingressName = (x \ "ingressName").extractOpt[String].orNull,
+ hostname = (x \ "hostname").extractOpt[String].orNull,
+ path = (x \ "path").extractOpt[String].orNull,
+ allNodes = (x \ "allNodes").extractOpt[Boolean].getOrElse(false)
+ )
+ })
+ Some(list)
+ case _ => None
+ }
+ case Failure(_) => None
+ }
+ }
+
+ }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
new file mode 100644
index 000000000..eca3d566b
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import org.apache.streampark.common.util.Logger
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.flink.client.program.ClusterClient
+
+import scala.language.postfixOps
+
+object IngressController extends Logger {
+
+ val ingressStrategy: IngressStrategy = {
+ if (getKubernetesVersion() >= 1.19) new IngressStrategyV1()
+ else new IngressStrategyV1beta1()
+ }
+
+ def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = {
+ ingressStrategy.configureIngress(domainName, clusterId, nameSpace)
+ }
+
+ def ingressUrlAddress(
+ nameSpace: String,
+ clusterId: String,
+ clusterClient: ClusterClient[_]): String = {
+ ingressStrategy.ingressUrlAddress(nameSpace, clusterId, clusterClient)
+ }
+
+ def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String = {
+ ingressStrategy.prepareIngressTemplateFiles(buildWorkspace, ingressTemplates)
+ }
+
+ def getKubernetesVersion(): Double = {
+ val client = new DefaultKubernetesClient()
+ val versionInfo = client.getVersion
+ val version = versionInfo.getMajor.toDouble + versionInfo.getMinor.toDouble / 10
+ version
+ }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
new file mode 100644
index 000000000..7137f3c07
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import org.apache.flink.client.program.ClusterClient
+
+trait IngressStrategy {
+ def ingressUrlAddress(
+ nameSpace: String,
+ clusterId: String,
+ clusterClient: ClusterClient[_]): String
+ def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String
+
+ def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
new file mode 100644
index 000000000..4bff48fc9
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.{Success, Try}
+
+class IngressStrategyV1 extends BaseIngressStrategy {
+ override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = {
+ Try(new DefaultKubernetesClient) match {
+ case Success(client) =>
+ val annotMap = Map[String, String](
+ "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
+ "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
+ "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + clusterId + ")$ $1/ permanent;")
+ )
+ val labelsMap = Map[String, String](
+ "app" -> clusterId,
+ "type" -> "flink-native-kubernetes",
+ "component" -> "ingress")
+
+ val deployment = client
+ .apps()
+ .deployments()
+ .inNamespace(nameSpace)
+ .withName(clusterId)
+ .get()
+
+ val deploymentUid = if (deployment != null) {
+ deployment.getMetadata.getUid
+ } else {
+ throw new RuntimeException(
+ s"Deployment with name $clusterId not found in namespace $nameSpace")
+ }
+
+ // Create OwnerReference object
+ val ownerReference = new OwnerReferenceBuilder()
+ .withApiVersion("apps/v1")
+ .withKind("Deployment")
+ .withName(clusterId)
+ .withUid(deploymentUid)
+ .withController(true)
+ .withBlockOwnerDeletion(true)
+ .build()
+
+ val ingress = new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder()
+ .withNewMetadata()
+ .withName(clusterId)
+ .addToAnnotations(annotMap.asJava)
+ .addToLabels(labelsMap.asJava)
+ .addToOwnerReferences(ownerReference)
+ .endMetadata()
+ .withNewSpec()
+ .addNewRule()
+ .withHost(domainName)
+ .withNewHttp()
+ .addNewPath()
+ .withPath(s"/$nameSpace/$clusterId/")
+ .withNewBackend()
+ .withServiceName(s"$clusterId-rest")
+ .withServicePort(new IntOrString("rest"))
+ .endBackend()
+ .endPath()
+ .addNewPath()
+ .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
+ .withNewBackend()
+ .withServiceName(s"$clusterId-rest")
+ .withServicePort(new IntOrString("rest"))
+ .endBackend()
+ .endPath()
+ .endHttp()
+ .endRule()
+ .endSpec()
+ .build()
+ client.network.ingress.inNamespace(nameSpace).create(ingress)
+ case _ =>
+ }
+ }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
new file mode 100644
index 000000000..8a1fabf62
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -0,0 +1,92 @@
+package org.apache.streampark.flink.kubernetes.ingress
+
+import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.{Success, Try}
+
+class IngressStrategyV1beta1 extends BaseIngressStrategy {
+ override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = {
+ Try(new DefaultKubernetesClient) match {
+ case Success(client) =>
+ val annotMap = Map[String, String](
+ "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
+ "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
+ "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + clusterId + ")$ $1/ permanent;")
+ )
+ val labelsMap = Map[String, String](
+ "app" -> clusterId,
+ "type" -> "flink-native-kubernetes",
+ "component" -> "ingress")
+
+ val deployment = client
+ .apps()
+ .deployments()
+ .inNamespace(nameSpace)
+ .withName(clusterId)
+ .get()
+
+ val deploymentUid = if (deployment != null) {
+ deployment.getMetadata.getUid
+ } else {
+ throw new RuntimeException(
+ s"Deployment with name $clusterId not found in namespace $nameSpace")
+ }
+
+ // Create OwnerReference object
+ val ownerReference = new OwnerReferenceBuilder()
+ .withApiVersion("apps/v1")
+ .withKind("Deployment")
+ .withName(clusterId)
+ .withUid(deploymentUid)
+ .withController(true)
+ .withBlockOwnerDeletion(true)
+ .build()
+
+ val ingress = new io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder()
+ .withNewMetadata()
+ .withName(clusterId)
+ .addToAnnotations(annotMap.asJava)
+ .addToLabels(labelsMap.asJava)
+ .addToOwnerReferences(ownerReference)
+ .endMetadata()
+ .withNewSpec()
+ .addNewRule()
+ .withHost(domainName)
+ .withNewHttp()
+ .addNewPath()
+ .withPath(s"/$nameSpace/$clusterId/")
+ .withPathType("ImplementationSpecific")
+ .withNewBackend()
+ .withNewService()
+ .withName(s"$clusterId-rest")
+ .withNewPort()
+ .withName("rest")
+ .endPort()
+ .endService()
+ .endBackend()
+ .endPath()
+ .addNewPath()
+ .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
+ .withPathType("ImplementationSpecific")
+ .withNewBackend()
+ .withNewService()
+ .withName(s"$clusterId-rest")
+ .withNewPort()
+ .withName("rest")
+ .endPort()
+ .endService()
+ .endBackend()
+ .endPath()
+ .endHttp()
+ .endRule()
+ .endSpec()
+ .build()
+ client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
+
+ case _ =>
+ }
+ }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 03d68b3d3..2dc756a0d 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -19,11 +19,12 @@ package org.apache.streampark.flink.kubernetes.watcher
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, IngressController, JobStatusWatcherConfig, KubernetesRetriever}
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.kubernetes.model._
import com.google.common.base.Charsets
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 5567cf5e9..5539222fe 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.kubernetes
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
+import org.apache.streampark.flink.kubernetes.ingress.{IngressStrategyV1, IngressStrategyV1beta1}
import org.apache.streampark.flink.kubernetes.watcher.{Checkpoint, FlinkRestJmConfigItem, FlinkRestOverview, JobDetails}
import com.google.common.base.Charsets
@@ -294,7 +295,7 @@ class FlinkRestJsonTest {
|]
|""".stripMargin
- val ingressMeta = IngressMeta.as(json)
+ val ingressMeta = new IngressStrategyV1().IngressMeta.as(json)
println(ingressMeta.get)
}
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 93b8aab17..fc3fe4738 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -20,7 +20,8 @@ package org.apache.streampark.flink.packer.pipeline.impl
import org.apache.streampark.common.enums.DevelopmentMode
import org.apache.streampark.common.fs.LfsOperator
import org.apache.streampark.common.util.ThreadUtils
-import org.apache.streampark.flink.kubernetes.{IngressController, PodTemplateTool}
+import org.apache.streampark.flink.kubernetes.PodTemplateTool
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.packer.docker._
import org.apache.streampark.flink.packer.maven.MavenTool
import org.apache.streampark.flink.packer.pipeline._