You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:33:57 UTC

[flink-kubernetes-operator] 04/23: Session cluster + local debugging support

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit e48935c27e51b1450511a74a88d6f128e8617457
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Tue Feb 1 10:35:05 2022 +0100

    Session cluster + local debugging support
---
 README.md                                          | 47 ++++++++----
 deploy/flink-operator.yaml                         |  1 -
 examples/basic-session.yaml                        | 21 +++++
 examples/pod-template.yaml                         | 12 ++-
 .../controller/FlinkDeploymentController.java      | 89 +++++++++++++++-------
 .../kubernetes/operator/crd/FlinkDeployment.java   |  2 +
 .../flink/kubernetes/operator/utils/Constants.java |  6 --
 .../kubernetes/operator/utils/FlinkUtils.java      | 21 ++++-
 8 files changed, 148 insertions(+), 51 deletions(-)

diff --git a/README.md b/README.md
index b0b58f9..19873e2 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
 # flink-kubernetes-operator
-Temporary repository for Flink Kubernetes Operator. The content will be moved to OSS repo once created and IPR.
+Temporary repository for Flink Kubernetes Operator. The content will be moved to OSS repo once created an IPR. Check [FLIP-212](https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator) further info.
 
 ## How to Build
 ```
@@ -7,9 +7,9 @@ mvn clean install
 ```
 
 ## How to Run
-* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply:
+* Make Sure that `FlinkDeployment` Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply:
 ```
-k apply -f target/classes/META-INF/fabric8/flinkapplications.flink.io-v1.yml
+k apply -f target/classes/META-INF/fabric8/flindeployments.flink.io-v1.yml
 ```
 * (Optional) Build Docker Image
 ```
@@ -20,28 +20,47 @@ docker build . -t docker.apple.com/gyula_fora/flink-java-operator:latest
 kubectl apply -f deploy/rbac.yaml
 kubectl apply -f deploy/flink-operator.yaml
 ```
-* Create a new Flink application
-The flink-operator will watch the CRD resources and submit a new Flink application once the CR it applied.
+* Create a new Flink deployment
+The flink-operator will watch the CRD resources and submit a new Flink deployment once the CR it applied.
 ```
-kubectl apply -f deploy/cr.yaml
+kubectl apply -f deploy/basic.yaml
 ```
 
-* Delete a Flink application
+* Delete a Flink deployment
 ```
-kubectl delete -f deploy/cr.yaml
+kubectl delete -f deploy/basic.yaml
 
 OR
 
-kubectl delete flinkapp {app_name}
+kubectl delete flinkdep {dep_name}
 ```
 
-* Get/List Flink applications
-Get all the Flink applications running in the K8s cluster
+* Get/List Flink deployments
+Get all the Flink deployments running in the K8s cluster
 ```
-kubectl get flinkapp
+kubectl get flinkdep
 ```
 
-Describe a specific Flink application to show the status(including job status, savepoint, ect.)
+Describe a specific Flink deployment to show the status(including job status, savepoint, ect.)
 ```
-kubectl describe flinkapp {app_name}
+kubectl describe flinkdep {dep_name}
 ```
+## How to Debug
+You can run or debug the `FlinkOperator` from your preferred IDE. The operator itself is accessing the deployed Flink clusters through the REST interface. When running locally the `rest.port` and `rest.address` Flink configuration parameters must be modified to a locally accessible value. 
+
+When using `minikube tunnel` the rest service is exposed on `localhost:8081`
+```
+> minikube tunnel
+
+> kubectl get services
+NAME                         TYPE           CLUSTER-IP     EXTERNAL-IP   PORT(S)             AGE
+basic-session-example        ClusterIP      None           <none>        6123/TCP,6124/TCP   14h
+basic-session-example-rest   LoadBalancer   10.96.36.250   127.0.0.1     8081:30572/TCP      14h
+```
+The operator pics up the default log and flink configurations from `/opt/flink/conf`. You can put the rest configuration parameters here:
+```
+cat /opt/flink/conf/flink-conf.yaml
+rest.port: 8081
+rest.address: localhost
+```
+
diff --git a/deploy/flink-operator.yaml b/deploy/flink-operator.yaml
index c2698c7..bddf360 100644
--- a/deploy/flink-operator.yaml
+++ b/deploy/flink-operator.yaml
@@ -43,7 +43,6 @@ metadata:
     app: flink
 data:
   flink-conf.yaml: |+
-    jobmanager.rpc.address: flink-jobmanager
     taskmanager.numberOfTaskSlots: 2
     blob.server.port: 6124
     jobmanager.rpc.port: 6123
diff --git a/examples/basic-session.yaml b/examples/basic-session.yaml
new file mode 100644
index 0000000..b332391
--- /dev/null
+++ b/examples/basic-session.yaml
@@ -0,0 +1,21 @@
+apiVersion: flink.io/v1alpha1
+kind: FlinkDeployment
+metadata:
+  namespace: default
+  name: basic-session-example
+spec:
+  image: flink:1.14.3
+  flinkVersion: 1.14.3
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "2"
+    kubernetes.jobmanager.service-account: flink-operator
+  jobManager:
+    replicas: 1
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    taskSlots: 2
+    resource:
+      memory: "2048m"
+      cpu: 1
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index 02fea81..1ce72cd 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -9,7 +9,6 @@ spec:
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
     kubernetes.jobmanager.service-account: flink-operator
-    kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
   podTemplate:
     apiVersion: v1
     kind: Pod
@@ -42,6 +41,17 @@ spec:
     resource:
       memory: "2048m"
       cpu: 1
+    podTemplate:
+      apiVersion: v1
+      kind: Pod
+      metadata:
+        name: task-manager-pod-template
+      spec:
+        initContainers:
+          # Sample sidecar container
+          - name: busybox
+            image: busybox:latest
+            command: [ 'sh','-c','echo hello from task manager' ]
   job:
     jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
     parallelism: 2
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index ce73e85..9a64f04 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -2,7 +2,9 @@ package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
@@ -83,11 +85,15 @@ public class FlinkDeploymentController
 
         final Configuration effectiveConfig =
                 FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec());
-        if (flinkApp.getStatus() == null
-                && flinkApp.getSpec().getJob().getState().equals(JobState.RUNNING)) {
+        if (flinkApp.getStatus() == null) {
             try {
                 flinkApp.setStatus(new FlinkDeploymentStatus());
-                deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+                if (flinkApp.getSpec().getJob() != null
+                        && flinkApp.getSpec().getJob().getState().equals(JobState.RUNNING)) {
+                    deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+                } else {
+                    deployFlinkSession(flinkApp, effectiveConfig);
+                }
             } catch (Exception e) {
                 LOG.error("Error while deploying " + flinkApp.getMetadata().getName());
                 return UpdateControl.<FlinkDeployment>noUpdate()
@@ -133,12 +139,28 @@ public class FlinkDeploymentController
                         flinkApp.getSpec().getJob().getEntryClass());
 
         deployer.run(effectiveConfig, applicationConfiguration);
-        flinkApp.getStatus().setJobStatuses(null);
         LOG.info("{} deployed", flinkApp.getMetadata().getName());
     }
 
+    private void deployFlinkSession(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        LOG.info("Deploying session cluster {}", flinkApp.getMetadata().getName());
+        final ClusterClientServiceLoader clusterClientServiceLoader =
+                new DefaultClusterClientServiceLoader();
+        final ClusterClientFactory<String> kubernetesClusterClientFactory =
+                clusterClientServiceLoader.getClusterClientFactory(effectiveConfig);
+        try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
+                kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfig)) {
+            kubernetesClusterDescriptor.deploySessionCluster(
+                    kubernetesClusterClientFactory.getClusterSpecification(effectiveConfig));
+        } catch (Exception e) {
+            LOG.error("Failed to deploy {}", flinkApp.getMetadata().getName(), e);
+        }
+        LOG.info("Session cluster {} deployed", flinkApp.getMetadata().getName());
+    }
+
     private boolean updateFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
-        if (!flinkApp.getStatus().getSpec().getJob().getState().equals(JobState.RUNNING)) {
+        if (flinkApp.getStatus().getSpec().getJob() == null
+                || !flinkApp.getStatus().getSpec().getJob().getState().equals(JobState.RUNNING)) {
             return true;
         }
         LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
@@ -202,32 +224,43 @@ public class FlinkDeploymentController
         boolean specChanged = !flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
 
         if (specChanged) {
-            JobState currentJobState = flinkApp.getStatus().getSpec().getJob().getState();
-            JobState desiredJobState = flinkApp.getSpec().getJob().getState();
-            UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
-            if (currentJobState == JobState.RUNNING) {
-                if (desiredJobState == JobState.RUNNING) {
-                    upgradeFlinkJob(flinkApp, effectiveConfig);
+            if (flinkApp.getStatus().getSpec().getJob() != null) {
+                if (flinkApp.getSpec().getJob() == null) {
+                    throw new RuntimeException("Cannot switch from job to session cluster");
                 }
-                if (desiredJobState.equals(JobState.SUSPENDED)) {
-                    if (upgradeMode == UpgradeMode.STATELESS) {
-                        cancelJob(flinkApp, effectiveConfig);
-                    } else {
-                        suspendJob(flinkApp, effectiveConfig);
+                JobState currentJobState = flinkApp.getStatus().getSpec().getJob().getState();
+                JobState desiredJobState = flinkApp.getSpec().getJob().getState();
+
+                UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
+                if (currentJobState == JobState.RUNNING) {
+                    if (desiredJobState == JobState.RUNNING) {
+                        upgradeFlinkJob(flinkApp, effectiveConfig);
+                    }
+                    if (desiredJobState.equals(JobState.SUSPENDED)) {
+                        if (upgradeMode == UpgradeMode.STATELESS) {
+                            cancelJob(flinkApp, effectiveConfig);
+                        } else {
+                            suspendJob(flinkApp, effectiveConfig);
+                        }
                     }
                 }
-            }
-            if (currentJobState == JobState.SUSPENDED) {
-                if (desiredJobState == JobState.RUNNING) {
-                    if (upgradeMode == UpgradeMode.STATELESS) {
-                        deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
-                    } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
-                        restoreFromLastSavepoint(flinkApp, effectiveConfig);
-                    } else {
-                        throw new UnsupportedOperationException(
-                                "Only savepoint and stateless strategies are supported at the moment.");
+                if (currentJobState == JobState.SUSPENDED) {
+                    if (desiredJobState == JobState.RUNNING) {
+                        if (upgradeMode == UpgradeMode.STATELESS) {
+                            deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+                        } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
+                            restoreFromLastSavepoint(flinkApp, effectiveConfig);
+                        } else {
+                            throw new UnsupportedOperationException(
+                                    "Only savepoint and stateless strategies are supported at the moment.");
+                        }
                     }
                 }
+            } else {
+                if (flinkApp.getSpec().getJob() != null) {
+                    throw new RuntimeException("Cannot switch from session to job cluster");
+                }
+                upgradeSessionCluster(flinkApp, effectiveConfig);
             }
         }
     }
@@ -318,6 +351,10 @@ public class FlinkDeploymentController
         }
     }
 
+    private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
     @Override
     public List<EventSource> prepareEventSources(
             EventSourceContext<FlinkDeployment> eventSourceContext) {
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
index 505fcae..b32aa10 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
@@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import io.fabric8.kubernetes.api.model.Namespaced;
 import io.fabric8.kubernetes.client.CustomResource;
 import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.ShortNames;
 import io.fabric8.kubernetes.model.annotation.Version;
 
 /** Flink deployment object (spec + status). */
@@ -15,5 +16,6 @@ import io.fabric8.kubernetes.model.annotation.Version;
 @JsonDeserialize()
 @Group("flink.io")
 @Version("v1alpha1")
+@ShortNames({"flinkdep"})
 public class FlinkDeployment extends CustomResource<FlinkDeploymentSpec, FlinkDeploymentStatus>
         implements Namespaced {}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/Constants.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/Constants.java
deleted file mode 100644
index e113144..0000000
--- a/src/main/java/org/apache/flink/kubernetes/operator/utils/Constants.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.flink.kubernetes.operator.utils;
-
-/** Constants used by the operator. */
-public class Constants {
-    public static final String KUBERNETES_APP_TARGET = "kubernetes-application";
-}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index a5a04d2..f31c4d4 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -12,6 +12,7 @@ import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -20,6 +21,8 @@ import org.apache.flink.util.StringUtils;
 
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.client.internal.SerializationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -30,6 +33,8 @@ import java.util.Collections;
 /** Flink Utility methods used by the operator. */
 public class FlinkUtils {
 
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class);
+
     public static Configuration getEffectiveConfig(
             String namespace, String clusterId, FlinkDeploymentSpec spec) {
         try {
@@ -44,7 +49,14 @@ public class FlinkUtils {
 
             effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
             effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
-            effectiveConfig.set(DeploymentOptions.TARGET, Constants.KUBERNETES_APP_TARGET);
+
+            if (spec.getJob() != null) {
+                effectiveConfig.set(
+                        DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
+            } else {
+                effectiveConfig.set(
+                        DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
+            }
 
             if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
                 effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImage());
@@ -136,8 +148,11 @@ public class FlinkUtils {
         final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
         final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
         final int port = config.getInteger(RestOptions.PORT);
-        final String restServerAddress =
-                String.format("http://%s-rest.%s:%s", clusterId, namespace, port);
+        final String host =
+                config.getString(
+                        RestOptions.ADDRESS, String.format("%s-rest.%s", clusterId, namespace));
+        final String restServerAddress = String.format("http://%s:%s", host, port);
+        LOG.info("Creating RestClusterClient({})", restServerAddress);
         return new RestClusterClient<>(
                 config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
     }