You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2023/04/12 08:22:46 UTC
[incubator-streampark] 01/01: [Improve][kubernetes] ingress support ownerReferences
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch ingress
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 16bd7d3689cf053c3eab1bea19483184e36983ce
Author: monster <60...@users.noreply.github.com>
AuthorDate: Wed Apr 12 16:22:34 2023 +0800
[Improve][kubernetes] ingress support ownerReferences
---
.../flink/kubernetes/IngressController.scala | 30 +++++++++++++++++++---
1 file changed, 26 insertions(+), 4 deletions(-)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
index 3e35b9240..60053d79f 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
@@ -21,19 +21,16 @@ import java.io.File
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Paths
-
import scala.collection.JavaConverters._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
-
-import io.fabric8.kubernetes.api.model.IntOrString
+import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.commons.io.FileUtils
import org.apache.flink.client.program.ClusterClient
import org.json4s.{DefaultFormats, JArray}
import org.json4s.jackson.JsonMethods.parse
-
import org.apache.streampark.common.util.Logger
import org.apache.streampark.common.util.Utils._
@@ -50,11 +47,36 @@ object IngressController extends Logger {
"app" -> clusterId,
"type" -> "flink-native-kubernetes",
"component" -> "ingress")
+
+ val deployment = client
+ .apps()
+ .deployments()
+ .inNamespace(nameSpace)
+ .withName(clusterId)
+ .get()
+
+ val deploymentUid = if (deployment != null) {
+ deployment.getMetadata.getUid
+ } else {
+ throw new RuntimeException(s"Deployment with name $clusterId not found in namespace $nameSpace")
+ }
+
+ // Create OwnerReference object
+ val ownerReference = new OwnerReferenceBuilder()
+ .withApiVersion("apps/v1")
+ .withKind("Deployment")
+ .withName(clusterId)
+ .withUid(deploymentUid)
+ .withController(true)
+ .withBlockOwnerDeletion(true)
+ .build()
+
val ingress = new IngressBuilder()
.withNewMetadata()
.withName(clusterId)
.addToAnnotations(annotMap.asJava)
.addToLabels(labelsMap.asJava)
+ .addToOwnerReferences(ownerReference) // Add OwnerReference
.endMetadata()
.withNewSpec()
.addNewRule()