You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2023/04/12 08:22:46 UTC

[incubator-streampark] 01/01: [Improve][kubernetes] ingress support ownerReferences

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch ingress
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 16bd7d3689cf053c3eab1bea19483184e36983ce
Author: monster <60...@users.noreply.github.com>
AuthorDate: Wed Apr 12 16:22:34 2023 +0800

    [Improve][kubernetes] ingress support ownerReferences
---
 .../flink/kubernetes/IngressController.scala       | 30 +++++++++++++++++++---
 1 file changed, 26 insertions(+), 4 deletions(-)

diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
index 3e35b9240..60053d79f 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
@@ -21,19 +21,16 @@ import java.io.File
 import java.io.IOException
 import java.nio.file.Files
 import java.nio.file.Paths
-
 import scala.collection.JavaConverters._
 import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
-
-import io.fabric8.kubernetes.api.model.IntOrString
+import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
 import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import org.apache.commons.io.FileUtils
 import org.apache.flink.client.program.ClusterClient
 import org.json4s.{DefaultFormats, JArray}
 import org.json4s.jackson.JsonMethods.parse
-
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.common.util.Utils._
 
@@ -50,11 +47,36 @@ object IngressController extends Logger {
           "app" -> clusterId,
           "type" -> "flink-native-kubernetes",
           "component" -> "ingress")
+
+        val deployment = client
+          .apps()
+          .deployments()
+          .inNamespace(nameSpace)
+          .withName(clusterId)
+          .get()
+
+        val deploymentUid = if (deployment != null) {
+          deployment.getMetadata.getUid
+        } else {
+          throw new RuntimeException(s"Deployment with name $clusterId not found in namespace $nameSpace")
+        }
+
+        // Create OwnerReference object
+        val ownerReference = new OwnerReferenceBuilder()
+          .withApiVersion("apps/v1")
+          .withKind("Deployment")
+          .withName(clusterId)
+          .withUid(deploymentUid)
+          .withController(true)
+          .withBlockOwnerDeletion(true)
+          .build()
+
         val ingress = new IngressBuilder()
           .withNewMetadata()
           .withName(clusterId)
           .addToAnnotations(annotMap.asJava)
           .addToLabels(labelsMap.asJava)
+          .addToOwnerReferences(ownerReference) // Add OwnerReference
           .endMetadata()
           .withNewSpec()
           .addNewRule()