You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:33:57 UTC
[flink-kubernetes-operator] 04/23: Session cluster + local debugging support
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit e48935c27e51b1450511a74a88d6f128e8617457
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Tue Feb 1 10:35:05 2022 +0100
Session cluster + local debugging support
---
README.md | 47 ++++++++----
deploy/flink-operator.yaml | 1 -
examples/basic-session.yaml | 21 +++++
examples/pod-template.yaml | 12 ++-
.../controller/FlinkDeploymentController.java | 89 +++++++++++++++-------
.../kubernetes/operator/crd/FlinkDeployment.java | 2 +
.../flink/kubernetes/operator/utils/Constants.java | 6 --
.../kubernetes/operator/utils/FlinkUtils.java | 21 ++++-
8 files changed, 148 insertions(+), 51 deletions(-)
diff --git a/README.md b/README.md
index b0b58f9..19873e2 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
# flink-kubernetes-operator
-Temporary repository for Flink Kubernetes Operator. The content will be moved to OSS repo once created and IPR.
+Temporary repository for Flink Kubernetes Operator. The content will be moved to OSS repo once created an IPR. Check [FLIP-212](https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator) further info.
## How to Build
```
@@ -7,9 +7,9 @@ mvn clean install
```
## How to Run
-* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply:
+* Make Sure that `FlinkDeployment` Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply:
```
-k apply -f target/classes/META-INF/fabric8/flinkapplications.flink.io-v1.yml
+k apply -f target/classes/META-INF/fabric8/flindeployments.flink.io-v1.yml
```
* (Optional) Build Docker Image
```
@@ -20,28 +20,47 @@ docker build . -t docker.apple.com/gyula_fora/flink-java-operator:latest
kubectl apply -f deploy/rbac.yaml
kubectl apply -f deploy/flink-operator.yaml
```
-* Create a new Flink application
-The flink-operator will watch the CRD resources and submit a new Flink application once the CR it applied.
+* Create a new Flink deployment
+The flink-operator will watch the CRD resources and submit a new Flink deployment once the CR it applied.
```
-kubectl apply -f deploy/cr.yaml
+kubectl apply -f deploy/basic.yaml
```
-* Delete a Flink application
+* Delete a Flink deployment
```
-kubectl delete -f deploy/cr.yaml
+kubectl delete -f deploy/basic.yaml
OR
-kubectl delete flinkapp {app_name}
+kubectl delete flinkdep {dep_name}
```
-* Get/List Flink applications
-Get all the Flink applications running in the K8s cluster
+* Get/List Flink deployments
+Get all the Flink deployments running in the K8s cluster
```
-kubectl get flinkapp
+kubectl get flinkdep
```
-Describe a specific Flink application to show the status(including job status, savepoint, ect.)
+Describe a specific Flink deployment to show the status(including job status, savepoint, ect.)
```
-kubectl describe flinkapp {app_name}
+kubectl describe flinkdep {dep_name}
```
+## How to Debug
+You can run or debug the `FlinkOperator` from your preferred IDE. The operator itself is accessing the deployed Flink clusters through the REST interface. When running locally the `rest.port` and `rest.address` Flink configuration parameters must be modified to a locally accessible value.
+
+When using `minikube tunnel` the rest service is exposed on `localhost:8081`
+```
+> minikube tunnel
+
+> kubectl get services
+NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
+basic-session-example ClusterIP None <none> 6123/TCP,6124/TCP 14h
+basic-session-example-rest LoadBalancer 10.96.36.250 127.0.0.1 8081:30572/TCP 14h
+```
+The operator pics up the default log and flink configurations from `/opt/flink/conf`. You can put the rest configuration parameters here:
+```
+cat /opt/flink/conf/flink-conf.yaml
+rest.port: 8081
+rest.address: localhost
+```
+
diff --git a/deploy/flink-operator.yaml b/deploy/flink-operator.yaml
index c2698c7..bddf360 100644
--- a/deploy/flink-operator.yaml
+++ b/deploy/flink-operator.yaml
@@ -43,7 +43,6 @@ metadata:
app: flink
data:
flink-conf.yaml: |+
- jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
diff --git a/examples/basic-session.yaml b/examples/basic-session.yaml
new file mode 100644
index 0000000..b332391
--- /dev/null
+++ b/examples/basic-session.yaml
@@ -0,0 +1,21 @@
+apiVersion: flink.io/v1alpha1
+kind: FlinkDeployment
+metadata:
+ namespace: default
+ name: basic-session-example
+spec:
+ image: flink:1.14.3
+ flinkVersion: 1.14.3
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "2"
+ kubernetes.jobmanager.service-account: flink-operator
+ jobManager:
+ replicas: 1
+ resource:
+ memory: "2048m"
+ cpu: 1
+ taskManager:
+ taskSlots: 2
+ resource:
+ memory: "2048m"
+ cpu: 1
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index 02fea81..1ce72cd 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -9,7 +9,6 @@ spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.jobmanager.service-account: flink-operator
- kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
podTemplate:
apiVersion: v1
kind: Pod
@@ -42,6 +41,17 @@ spec:
resource:
memory: "2048m"
cpu: 1
+ podTemplate:
+ apiVersion: v1
+ kind: Pod
+ metadata:
+ name: task-manager-pod-template
+ spec:
+ initContainers:
+ # Sample sidecar container
+ - name: busybox
+ image: busybox:latest
+ command: [ 'sh','-c','echo hello from task manager' ]
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index ce73e85..9a64f04 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -2,7 +2,9 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
@@ -83,11 +85,15 @@ public class FlinkDeploymentController
final Configuration effectiveConfig =
FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec());
- if (flinkApp.getStatus() == null
- && flinkApp.getSpec().getJob().getState().equals(JobState.RUNNING)) {
+ if (flinkApp.getStatus() == null) {
try {
flinkApp.setStatus(new FlinkDeploymentStatus());
- deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+ if (flinkApp.getSpec().getJob() != null
+ && flinkApp.getSpec().getJob().getState().equals(JobState.RUNNING)) {
+ deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+ } else {
+ deployFlinkSession(flinkApp, effectiveConfig);
+ }
} catch (Exception e) {
LOG.error("Error while deploying " + flinkApp.getMetadata().getName());
return UpdateControl.<FlinkDeployment>noUpdate()
@@ -133,12 +139,28 @@ public class FlinkDeploymentController
flinkApp.getSpec().getJob().getEntryClass());
deployer.run(effectiveConfig, applicationConfiguration);
- flinkApp.getStatus().setJobStatuses(null);
LOG.info("{} deployed", flinkApp.getMetadata().getName());
}
+ private void deployFlinkSession(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+ LOG.info("Deploying session cluster {}", flinkApp.getMetadata().getName());
+ final ClusterClientServiceLoader clusterClientServiceLoader =
+ new DefaultClusterClientServiceLoader();
+ final ClusterClientFactory<String> kubernetesClusterClientFactory =
+ clusterClientServiceLoader.getClusterClientFactory(effectiveConfig);
+ try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
+ kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfig)) {
+ kubernetesClusterDescriptor.deploySessionCluster(
+ kubernetesClusterClientFactory.getClusterSpecification(effectiveConfig));
+ } catch (Exception e) {
+ LOG.error("Failed to deploy {}", flinkApp.getMetadata().getName(), e);
+ }
+ LOG.info("Session cluster {} deployed", flinkApp.getMetadata().getName());
+ }
+
private boolean updateFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
- if (!flinkApp.getStatus().getSpec().getJob().getState().equals(JobState.RUNNING)) {
+ if (flinkApp.getStatus().getSpec().getJob() == null
+ || !flinkApp.getStatus().getSpec().getJob().getState().equals(JobState.RUNNING)) {
return true;
}
LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
@@ -202,32 +224,43 @@ public class FlinkDeploymentController
boolean specChanged = !flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
if (specChanged) {
- JobState currentJobState = flinkApp.getStatus().getSpec().getJob().getState();
- JobState desiredJobState = flinkApp.getSpec().getJob().getState();
- UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
- if (currentJobState == JobState.RUNNING) {
- if (desiredJobState == JobState.RUNNING) {
- upgradeFlinkJob(flinkApp, effectiveConfig);
+ if (flinkApp.getStatus().getSpec().getJob() != null) {
+ if (flinkApp.getSpec().getJob() == null) {
+ throw new RuntimeException("Cannot switch from job to session cluster");
}
- if (desiredJobState.equals(JobState.SUSPENDED)) {
- if (upgradeMode == UpgradeMode.STATELESS) {
- cancelJob(flinkApp, effectiveConfig);
- } else {
- suspendJob(flinkApp, effectiveConfig);
+ JobState currentJobState = flinkApp.getStatus().getSpec().getJob().getState();
+ JobState desiredJobState = flinkApp.getSpec().getJob().getState();
+
+ UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
+ if (currentJobState == JobState.RUNNING) {
+ if (desiredJobState == JobState.RUNNING) {
+ upgradeFlinkJob(flinkApp, effectiveConfig);
+ }
+ if (desiredJobState.equals(JobState.SUSPENDED)) {
+ if (upgradeMode == UpgradeMode.STATELESS) {
+ cancelJob(flinkApp, effectiveConfig);
+ } else {
+ suspendJob(flinkApp, effectiveConfig);
+ }
}
}
- }
- if (currentJobState == JobState.SUSPENDED) {
- if (desiredJobState == JobState.RUNNING) {
- if (upgradeMode == UpgradeMode.STATELESS) {
- deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
- } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
- restoreFromLastSavepoint(flinkApp, effectiveConfig);
- } else {
- throw new UnsupportedOperationException(
- "Only savepoint and stateless strategies are supported at the moment.");
+ if (currentJobState == JobState.SUSPENDED) {
+ if (desiredJobState == JobState.RUNNING) {
+ if (upgradeMode == UpgradeMode.STATELESS) {
+ deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+ } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
+ restoreFromLastSavepoint(flinkApp, effectiveConfig);
+ } else {
+ throw new UnsupportedOperationException(
+ "Only savepoint and stateless strategies are supported at the moment.");
+ }
}
}
+ } else {
+ if (flinkApp.getSpec().getJob() != null) {
+ throw new RuntimeException("Cannot switch from session to job cluster");
+ }
+ upgradeSessionCluster(flinkApp, effectiveConfig);
}
}
}
@@ -318,6 +351,10 @@ public class FlinkDeploymentController
}
}
+ private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
@Override
public List<EventSource> prepareEventSources(
EventSourceContext<FlinkDeployment> eventSourceContext) {
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
index 505fcae..b32aa10 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
@@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;
/** Flink deployment object (spec + status). */
@@ -15,5 +16,6 @@ import io.fabric8.kubernetes.model.annotation.Version;
@JsonDeserialize()
@Group("flink.io")
@Version("v1alpha1")
+@ShortNames({"flinkdep"})
public class FlinkDeployment extends CustomResource<FlinkDeploymentSpec, FlinkDeploymentStatus>
implements Namespaced {}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/Constants.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/Constants.java
deleted file mode 100644
index e113144..0000000
--- a/src/main/java/org/apache/flink/kubernetes/operator/utils/Constants.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.flink.kubernetes.operator.utils;
-
-/** Constants used by the operator. */
-public class Constants {
- public static final String KUBERNETES_APP_TARGET = "kubernetes-application";
-}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index a5a04d2..f31c4d4 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -12,6 +12,7 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -20,6 +21,8 @@ import org.apache.flink.util.StringUtils;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.internal.SerializationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -30,6 +33,8 @@ import java.util.Collections;
/** Flink Utility methods used by the operator. */
public class FlinkUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class);
+
public static Configuration getEffectiveConfig(
String namespace, String clusterId, FlinkDeploymentSpec spec) {
try {
@@ -44,7 +49,14 @@ public class FlinkUtils {
effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
- effectiveConfig.set(DeploymentOptions.TARGET, Constants.KUBERNETES_APP_TARGET);
+
+ if (spec.getJob() != null) {
+ effectiveConfig.set(
+ DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
+ } else {
+ effectiveConfig.set(
+ DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
+ }
if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImage());
@@ -136,8 +148,11 @@ public class FlinkUtils {
final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
final int port = config.getInteger(RestOptions.PORT);
- final String restServerAddress =
- String.format("http://%s-rest.%s:%s", clusterId, namespace, port);
+ final String host =
+ config.getString(
+ RestOptions.ADDRESS, String.format("%s-rest.%s", clusterId, namespace));
+ final String restServerAddress = String.format("http://%s:%s", host, port);
+ LOG.info("Creating RestClusterClient({})", restServerAddress);
return new RestClusterClient<>(
config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
}