You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2023/04/25 05:10:53 UTC

[incubator-streampark] 01/01: [Improve]Ingress uses different versions of API according to different K8s versions.

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch ingressv1
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit f6e7b6b9257039f47251e5008ad2e560fe0e4af8
Author: monster <60...@users.noreply.github.com>
AuthorDate: Tue Apr 25 13:10:41 2023 +0800

    [Improve]Ingress uses different versions of API according to different K8s versions.
---
 .../core/service/impl/ApplicationServiceImpl.java  |   2 +-
 .../flink/kubernetes/IngressController.scala       | 230 ---------------------
 .../flink/kubernetes/KubernetesRetriever.scala     |   1 +
 .../kubernetes/ingress/BaseIngressStrategy.scala   | 123 +++++++++++
 .../kubernetes/ingress/IngressController.scala     |  55 +++++
 .../flink/kubernetes/ingress/IngressStrategy.scala |  30 +++
 .../kubernetes/ingress/IngressStrategyV1.scala     |  98 +++++++++
 .../ingress/IngressStrategyV1beta1.scala           |  92 +++++++++
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala |   3 +-
 .../flink/kubernetes/FlinkRestJsonTest.scala       |   3 +-
 .../impl/FlinkK8sApplicationBuildPipeline.scala    |   3 +-
 11 files changed, 406 insertions(+), 234 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 3fbee60fa..2242b897e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -86,8 +86,8 @@ import org.apache.streampark.flink.client.bean.SubmitRequest;
 import org.apache.streampark.flink.client.bean.SubmitResponse;
 import org.apache.streampark.flink.core.conf.ParameterCli;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
-import org.apache.streampark.flink.kubernetes.IngressController;
 import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
+import org.apache.streampark.flink.kubernetes.ingress.IngressController;
 import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
deleted file mode 100644
index fd758f4d3..000000000
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.kubernetes
-
-import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.util.Utils._
-
-import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
-import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
-import org.apache.commons.io.FileUtils
-import org.apache.flink.client.program.ClusterClient
-import org.json4s.{DefaultFormats, JArray}
-import org.json4s.jackson.JsonMethods.parse
-
-import java.io.File
-import java.io.IOException
-import java.nio.file.Files
-import java.nio.file.Paths
-
-import scala.collection.JavaConverters._
-import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
-
-object IngressController extends Logger {
-
-  def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = {
-    Try(new DefaultKubernetesClient) match {
-      case Success(client) =>
-        val annotMap = Map[String, String](
-          "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
-          "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
-          "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + clusterId + ")$ $1/ permanent;")
-        )
-        val labelsMap = Map[String, String](
-          "app" -> clusterId,
-          "type" -> "flink-native-kubernetes",
-          "component" -> "ingress")
-
-        val deployment = client
-          .apps()
-          .deployments()
-          .inNamespace(nameSpace)
-          .withName(clusterId)
-          .get()
-
-        val deploymentUid = if (deployment != null) {
-          deployment.getMetadata.getUid
-        } else {
-          throw new RuntimeException(
-            s"Deployment with name $clusterId not found in namespace $nameSpace")
-        }
-
-        // Create OwnerReference object
-        val ownerReference = new OwnerReferenceBuilder()
-          .withApiVersion("apps/v1")
-          .withKind("Deployment")
-          .withName(clusterId)
-          .withUid(deploymentUid)
-          .withController(true)
-          .withBlockOwnerDeletion(true)
-          .build()
-
-        val ingress = new IngressBuilder()
-          .withNewMetadata()
-          .withName(clusterId)
-          .addToAnnotations(annotMap.asJava)
-          .addToLabels(labelsMap.asJava)
-          .addToOwnerReferences(ownerReference) // Add OwnerReference
-          .endMetadata()
-          .withNewSpec()
-          .addNewRule()
-          .withHost(domainName)
-          .withNewHttp()
-          .addNewPath()
-          .withPath(s"/$nameSpace/$clusterId/")
-          .withPathType("ImplementationSpecific")
-          .withNewBackend()
-          .withNewService()
-          .withName(s"$clusterId-rest")
-          .withNewPort()
-          .withName("rest")
-          .endPort()
-          .endService()
-          .endBackend()
-          .endPath()
-          .addNewPath()
-          .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
-          .withPathType("ImplementationSpecific")
-          .withNewBackend()
-          .withNewService()
-          .withName(s"$clusterId-rest")
-          .withNewPort()
-          .withName("rest")
-          .endPort()
-          .endService()
-          .endBackend()
-          .endPath()
-          .endHttp()
-          .endRule()
-          .endSpec()
-          .build();
-        client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
-
-      case _ =>
-    }
-  }
-
-  def configureIngress(ingressOutput: String): Unit = {
-    close {
-      val client = new DefaultKubernetesClient
-      client.network.ingress
-        .load(Files.newInputStream(Paths.get(ingressOutput)))
-        .get()
-      client
-    }
-  }
-
-  private[this] def determineThePodSurvivalStatus(name: String, nameSpace: String): Boolean = {
-    tryWithResource(KubernetesRetriever.newK8sClient()) {
-      client =>
-        Try {
-          client
-            .apps()
-            .deployments()
-            .inNamespace(nameSpace)
-            .withName(name)
-            .get()
-            .getSpec()
-            .getSelector()
-            .getMatchLabels()
-          false
-        }.getOrElse(true)
-    }
-  }
-
-  def ingressUrlAddress(
-      nameSpace: String,
-      clusterId: String,
-      clusterClient: ClusterClient[_]): String = {
-    val client = new DefaultKubernetesClient
-    // for kubernetes 1.22+
-    lazy val fromV1 =
-      Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
-        .map(ingress => ingress.getSpec.getRules.get(0))
-        .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-    // for kubernetes 1.22-
-    lazy val fromV1beta1 =
-      Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
-        .map(ingress => ingress.getSpec.getRules.get(0))
-        .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-    Try(
-      fromV1
-        .orElse(fromV1beta1)
-        .map { case (host, path) => s"https://$host$path" }
-        .getOrElse(clusterClient.getWebInterfaceURL)
-    ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress error."))
-  }
-
-  @throws[IOException]
-  def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String = {
-    val workspaceDir = new File(buildWorkspace)
-    if (!workspaceDir.exists) workspaceDir.mkdir
-    if (ingressTemplates.isEmpty) null;
-    else {
-      val outputPath = buildWorkspace + "/ingress.yaml"
-      val outputFile = new File(outputPath)
-      FileUtils.write(outputFile, ingressTemplates, "UTF-8")
-      outputPath
-    }
-  }
-
-}
-
-case class IngressMeta(
-    addresses: List[String],
-    port: Integer,
-    protocol: String,
-    serviceName: String,
-    ingressName: String,
-    hostname: String,
-    path: String,
-    allNodes: Boolean)
-
-object IngressMeta {
-
-  @transient implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
-
-  def as(json: String): Option[List[IngressMeta]] = {
-    Try(parse(json)) match {
-      case Success(ok) =>
-        ok match {
-          case JArray(arr) =>
-            val list = arr.map(
-              x => {
-                IngressMeta(
-                  addresses =
-                    (x \ "addresses").extractOpt[List[String]].getOrElse(List.empty[String]),
-                  port = (x \ "port").extractOpt[Integer].getOrElse(0),
-                  protocol = (x \ "protocol").extractOpt[String].getOrElse(null),
-                  serviceName = (x \ "serviceName").extractOpt[String].getOrElse(null),
-                  ingressName = (x \ "ingressName").extractOpt[String].getOrElse(null),
-                  hostname = (x \ "hostname").extractOpt[String].getOrElse(null),
-                  path = (x \ "path").extractOpt[String].getOrElse(null),
-                  allNodes = (x \ "allNodes").extractOpt[Boolean].getOrElse(false)
-                )
-              })
-            Some(list)
-          case _ => None
-        }
-      case Failure(_) => None
-    }
-  }
-
-}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index b2808fc1c..d4e6ec0ed 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -20,6 +20,7 @@ package org.apache.streampark.flink.kubernetes
 import org.apache.streampark.common.util.{Logger, Utils}
 import org.apache.streampark.common.util.Utils.tryWithResource
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
 
 import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient, KubernetesClientException}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/BaseIngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/BaseIngressStrategy.scala
new file mode 100644
index 000000000..0a0e456d0
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/BaseIngressStrategy.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import org.apache.streampark.common.util.Utils._
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.commons.io.FileUtils
+import org.apache.flink.client.program.ClusterClient
+import org.json4s.{DefaultFormats, JArray}
+import org.json4s.jackson.JsonMethods.parse
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+
+abstract class BaseIngressStrategy extends IngressStrategy {
+  override def ingressUrlAddress(
+      nameSpace: String,
+      clusterId: String,
+      clusterClient: ClusterClient[_]): String = {
+    val client = new DefaultKubernetesClient
+    // for kubernetes 1.19+
+    lazy val fromV1 =
+      Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+        .map(ingress => ingress.getSpec.getRules.get(0))
+        .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
+    // for kubernetes 1.19-
+    lazy val fromV1beta1 =
+      Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+        .map(ingress => ingress.getSpec.getRules.get(0))
+        .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
+    Try(
+      fromV1
+        .orElse(fromV1beta1)
+        .map { case (host, path) => s"https://$host$path" }
+        .getOrElse(clusterClient.getWebInterfaceURL)
+    ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress error."))
+  }
+
+  override def prepareIngressTemplateFiles(
+      buildWorkspace: String,
+      ingressTemplates: String): String = {
+    val workspaceDir = new File(buildWorkspace)
+    if (!workspaceDir.exists) workspaceDir.mkdir
+    if (ingressTemplates.isEmpty) null
+    else {
+      val outputPath = buildWorkspace + "/ingress.yaml"
+      val outputFile = new File(outputPath)
+      FileUtils.write(outputFile, ingressTemplates, "UTF-8")
+      outputPath
+    }
+  }
+
+  def configureIngress(ingressOutput: String): Unit = {
+    close {
+      val client = new DefaultKubernetesClient
+      client.network.ingress
+        .load(Files.newInputStream(Paths.get(ingressOutput)))
+        .get()
+      client
+    }
+  }
+
+  case class IngressMeta(
+      addresses: List[String],
+      port: Integer,
+      protocol: String,
+      serviceName: String,
+      ingressName: String,
+      hostname: String,
+      path: String,
+      allNodes: Boolean)
+
+  object IngressMeta {
+
+    @transient implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+
+    def as(json: String): Option[List[IngressMeta]] = {
+      Try(parse(json)) match {
+        case Success(ok) =>
+          ok match {
+            case JArray(arr) =>
+              val list = arr.map(
+                x => {
+                  IngressMeta(
+                    addresses =
+                      (x \ "addresses").extractOpt[List[String]].getOrElse(List.empty[String]),
+                    port = (x \ "port").extractOpt[Integer].getOrElse(0),
+                    protocol = (x \ "protocol").extractOpt[String].orNull,
+                    serviceName = (x \ "serviceName").extractOpt[String].orNull,
+                    ingressName = (x \ "ingressName").extractOpt[String].orNull,
+                    hostname = (x \ "hostname").extractOpt[String].orNull,
+                    path = (x \ "path").extractOpt[String].orNull,
+                    allNodes = (x \ "allNodes").extractOpt[Boolean].getOrElse(false)
+                  )
+                })
+              Some(list)
+            case _ => None
+          }
+        case Failure(_) => None
+      }
+    }
+
+  }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
new file mode 100644
index 000000000..eca3d566b
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import org.apache.streampark.common.util.Logger
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.apache.flink.client.program.ClusterClient
+
+import scala.language.postfixOps
+
+object IngressController extends Logger {
+
+  val ingressStrategy: IngressStrategy = {
+    if (getKubernetesVersion() >= 1.19) new IngressStrategyV1()
+    else new IngressStrategyV1beta1()
+  }
+
+  def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = {
+    ingressStrategy.configureIngress(domainName, clusterId, nameSpace)
+  }
+
+  def ingressUrlAddress(
+      nameSpace: String,
+      clusterId: String,
+      clusterClient: ClusterClient[_]): String = {
+    ingressStrategy.ingressUrlAddress(nameSpace, clusterId, clusterClient)
+  }
+
+  def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String = {
+    ingressStrategy.prepareIngressTemplateFiles(buildWorkspace, ingressTemplates)
+  }
+
+  def getKubernetesVersion(): Double = {
+    val client = new DefaultKubernetesClient()
+    val versionInfo = client.getVersion
+    val version = versionInfo.getMajor.toDouble + versionInfo.getMinor.toDouble / 10
+    version
+  }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
new file mode 100644
index 000000000..7137f3c07
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import org.apache.flink.client.program.ClusterClient
+
+trait IngressStrategy {
+  def ingressUrlAddress(
+      nameSpace: String,
+      clusterId: String,
+      clusterClient: ClusterClient[_]): String
+  def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String
+
+  def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
new file mode 100644
index 000000000..4bff48fc9
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.kubernetes.ingress
+
+import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.{Success, Try}
+
+class IngressStrategyV1 extends BaseIngressStrategy {
+  override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = {
+    Try(new DefaultKubernetesClient) match {
+      case Success(client) =>
+        val annotMap = Map[String, String](
+          "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
+          "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
+          "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + clusterId + ")$ $1/ permanent;")
+        )
+        val labelsMap = Map[String, String](
+          "app" -> clusterId,
+          "type" -> "flink-native-kubernetes",
+          "component" -> "ingress")
+
+        val deployment = client
+          .apps()
+          .deployments()
+          .inNamespace(nameSpace)
+          .withName(clusterId)
+          .get()
+
+        val deploymentUid = if (deployment != null) {
+          deployment.getMetadata.getUid
+        } else {
+          throw new RuntimeException(
+            s"Deployment with name $clusterId not found in namespace $nameSpace")
+        }
+
+        // Create OwnerReference object
+        val ownerReference = new OwnerReferenceBuilder()
+          .withApiVersion("apps/v1")
+          .withKind("Deployment")
+          .withName(clusterId)
+          .withUid(deploymentUid)
+          .withController(true)
+          .withBlockOwnerDeletion(true)
+          .build()
+
+        val ingress = new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder()
+          .withNewMetadata()
+          .withName(clusterId)
+          .addToAnnotations(annotMap.asJava)
+          .addToLabels(labelsMap.asJava)
+          .addToOwnerReferences(ownerReference)
+          .endMetadata()
+          .withNewSpec()
+          .addNewRule()
+          .withHost(domainName)
+          .withNewHttp()
+          .addNewPath()
+          .withPath(s"/$nameSpace/$clusterId/")
+          .withNewBackend()
+          .withServiceName(s"$clusterId-rest")
+          .withServicePort(new IntOrString("rest"))
+          .endBackend()
+          .endPath()
+          .addNewPath()
+          .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
+          .withNewBackend()
+          .withServiceName(s"$clusterId-rest")
+          .withServicePort(new IntOrString("rest"))
+          .endBackend()
+          .endPath()
+          .endHttp()
+          .endRule()
+          .endSpec()
+          .build()
+        client.network.ingress.inNamespace(nameSpace).create(ingress)
+      case _ =>
+    }
+  }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
new file mode 100644
index 000000000..8a1fabf62
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -0,0 +1,92 @@
+package org.apache.streampark.flink.kubernetes.ingress
+
+import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.{Success, Try}
+
+class IngressStrategyV1beta1 extends BaseIngressStrategy {
+  override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = {
+    Try(new DefaultKubernetesClient) match {
+      case Success(client) =>
+        val annotMap = Map[String, String](
+          "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
+          "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
+          "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + clusterId + ")$ $1/ permanent;")
+        )
+        val labelsMap = Map[String, String](
+          "app" -> clusterId,
+          "type" -> "flink-native-kubernetes",
+          "component" -> "ingress")
+
+        val deployment = client
+          .apps()
+          .deployments()
+          .inNamespace(nameSpace)
+          .withName(clusterId)
+          .get()
+
+        val deploymentUid = if (deployment != null) {
+          deployment.getMetadata.getUid
+        } else {
+          throw new RuntimeException(
+            s"Deployment with name $clusterId not found in namespace $nameSpace")
+        }
+
+        // Create OwnerReference object
+        val ownerReference = new OwnerReferenceBuilder()
+          .withApiVersion("apps/v1")
+          .withKind("Deployment")
+          .withName(clusterId)
+          .withUid(deploymentUid)
+          .withController(true)
+          .withBlockOwnerDeletion(true)
+          .build()
+
+        val ingress = new io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder()
+          .withNewMetadata()
+          .withName(clusterId)
+          .addToAnnotations(annotMap.asJava)
+          .addToLabels(labelsMap.asJava)
+          .addToOwnerReferences(ownerReference)
+          .endMetadata()
+          .withNewSpec()
+          .addNewRule()
+          .withHost(domainName)
+          .withNewHttp()
+          .addNewPath()
+          .withPath(s"/$nameSpace/$clusterId/")
+          .withPathType("ImplementationSpecific")
+          .withNewBackend()
+          .withNewService()
+          .withName(s"$clusterId-rest")
+          .withNewPort()
+          .withName("rest")
+          .endPort()
+          .endService()
+          .endBackend()
+          .endPath()
+          .addNewPath()
+          .withPath(s"/$nameSpace/$clusterId" + "(/|$)(.*)")
+          .withPathType("ImplementationSpecific")
+          .withNewBackend()
+          .withNewService()
+          .withName(s"$clusterId-rest")
+          .withNewPort()
+          .withName("rest")
+          .endPort()
+          .endService()
+          .endBackend()
+          .endPath()
+          .endHttp()
+          .endRule()
+          .endSpec()
+          .build()
+        client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
+
+      case _ =>
+    }
+  }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 03d68b3d3..2dc756a0d 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -19,11 +19,12 @@ package org.apache.streampark.flink.kubernetes.watcher
 
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, IngressController, JobStatusWatcherConfig, KubernetesRetriever}
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
 import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
 import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
 import org.apache.streampark.flink.kubernetes.model._
 
 import com.google.common.base.Charsets
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 5567cf5e9..5539222fe 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -17,6 +17,7 @@
 package org.apache.streampark.flink.kubernetes
 
 import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
+import org.apache.streampark.flink.kubernetes.ingress.{IngressStrategyV1, IngressStrategyV1beta1}
 import org.apache.streampark.flink.kubernetes.watcher.{Checkpoint, FlinkRestJmConfigItem, FlinkRestOverview, JobDetails}
 
 import com.google.common.base.Charsets
@@ -294,7 +295,7 @@ class FlinkRestJsonTest {
         |]
         |""".stripMargin
 
-    val ingressMeta = IngressMeta.as(json)
+    val ingressMeta = new IngressStrategyV1().IngressMeta.as(json)
     println(ingressMeta.get)
   }
 
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 93b8aab17..fc3fe4738 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -20,7 +20,8 @@ package org.apache.streampark.flink.packer.pipeline.impl
 import org.apache.streampark.common.enums.DevelopmentMode
 import org.apache.streampark.common.fs.LfsOperator
 import org.apache.streampark.common.util.ThreadUtils
-import org.apache.streampark.flink.kubernetes.{IngressController, PodTemplateTool}
+import org.apache.streampark.flink.kubernetes.PodTemplateTool
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
 import org.apache.streampark.flink.packer.docker._
 import org.apache.streampark.flink.packer.maven.MavenTool
 import org.apache.streampark.flink.packer.pipeline._