You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:33:55 UTC
[flink-kubernetes-operator] 02/23: CRD alignment + first working flink deployment
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit f3d12d6535b93a4002ba30fdb00573276d191fad
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Thu Jan 27 20:02:00 2022 +0100
CRD alignment + first working flink deployment
---
README.md | 10 +-
deploy/flink-operator.yaml | 82 +---------
deploy/rbac.yaml | 76 +++++++++
examples/basic.yaml | 27 +++
examples/cr.yaml | 21 ---
examples/pod-template.yaml | 49 ++++++
pom.xml | 2 +-
...sOperatorEntrypoint.java => FlinkOperator.java} | 8 +-
.../kubernetes/operator/Utils/FlinkUtils.java | 181 ++++++++++++---------
.../controller/FlinkApplicationController.java | 169 -------------------
.../controller/FlinkDeploymentController.java | 123 ++++++++++++++
...{FlinkApplication.java => FlinkDeployment.java} | 6 +-
...plicationList.java => FlinkDeploymentList.java} | 2 +-
.../kubernetes/operator/crd/spec/CancelMode.java | 10 ++
.../operator/crd/spec/FlinkApplicationSpec.java | 30 ----
.../operator/crd/spec/FlinkDeploymentSpec.java | 22 +++
.../operator/crd/spec/JobManagerSpec.java | 14 ++
.../kubernetes/operator/crd/spec/JobSpec.java | 16 ++
.../kubernetes/operator/crd/spec/Resource.java | 4 +-
.../kubernetes/operator/crd/spec/RestoreMode.java | 12 ++
.../operator/crd/spec/TaskManagerSpec.java | 14 ++
...ationStatus.java => FlinkDeploymentStatus.java} | 2 +-
src/main/resources/log4j2.properties | 2 +-
23 files changed, 489 insertions(+), 393 deletions(-)
diff --git a/README.md b/README.md
index 845b97d..b0b58f9 100644
--- a/README.md
+++ b/README.md
@@ -7,17 +7,17 @@ mvn clean install
```
## How to Run
-* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. The CRD could be find [here](deploy/crd.yaml). If not, issue the following commands to apply:
+* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply:
```
-kubectl apply -f deploy/crd.yaml
+k apply -f target/classes/META-INF/fabric8/flinkapplications.flink.io-v1.yml
```
-* Build Docker Image
+* (Optional) Build Docker Image
```
docker build . -t docker.apple.com/gyula_fora/flink-java-operator:latest
```
-* Start flink-operator deployment
-A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services.
+* Start flink-operator deployment. A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services.
```
+kubectl apply -f deploy/rbac.yaml
kubectl apply -f deploy/flink-operator.yaml
```
* Create a new Flink application
diff --git a/deploy/flink-operator.yaml b/deploy/flink-operator.yaml
index 564ed12..c2698c7 100644
--- a/deploy/flink-operator.yaml
+++ b/deploy/flink-operator.yaml
@@ -15,7 +15,7 @@ spec:
serviceAccountName: flink-operator
containers:
- name: flink-operator
- image: docker.apple.com/gyula_fora/flink-java-operator:latest
+ image: docker.apple.com/matyas_orhidi/flink-java-operator:latest
imagePullPolicy: Always
env:
- name: FLINK_CONF_DIR
@@ -26,7 +26,7 @@ spec:
volumes:
- name: flink-config-volume
configMap:
- name: flink-config
+ name: flink-operator-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
@@ -38,7 +38,7 @@ spec:
apiVersion: v1
kind: ConfigMap
metadata:
- name: flink-config
+ name: flink-operator-config
labels:
app: flink
data:
@@ -97,79 +97,3 @@ data:
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
-
----
-
-apiVersion: v1
-kind: ServiceAccount
-metadata:
- name: flink-operator
-
----
-
-apiVersion: v1
-kind: ClusterRole
-apiVersion: rbac.authorization.k8s.io/v1
-metadata:
- name: flink-operator
-rules:
-- apiGroups:
- - flink-operator
- resources:
- - "*"
- verbs:
- - "*"
-- apiGroups:
- - ""
- resources:
- - pods
- - services
- - endpoints
- - persistentvolumeclaims
- - events
- - configmaps
- - secrets
- verbs:
- - "*"
-- apiGroups:
- - apps
- resources:
- - deployments
- - replicasets
- verbs:
- - "*"
-- apiGroups:
- - extensions
- resources:
- - deployments
- - ingresses
- verbs:
- - "*"
-- apiGroups:
- - flink.io
- resources:
- - flinkapplications
- verbs:
- - "*"
-- apiGroups:
- - networking.k8s.io
- resources:
- - ingresses
- verbs:
- - "*"
-
----
-
-apiVersion: v1
-kind: ClusterRoleBinding
-apiVersion: rbac.authorization.k8s.io/v1
-metadata:
- name: flink-operator-cluster-role-binding
-subjects:
-- kind: ServiceAccount
- name: flink-operator
- namespace: default
-roleRef:
- kind: ClusterRole
- name: flink-operator
- apiGroup: rbac.authorization.k8s.io
diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml
new file mode 100644
index 0000000..062ecab
--- /dev/null
+++ b/deploy/rbac.yaml
@@ -0,0 +1,76 @@
+---
+
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ name: flink-operator
+
+---
+
+apiVersion: v1
+kind: ClusterRole
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: flink-operator
+rules:
+- apiGroups:
+ - flink-operator
+ resources:
+ - "*"
+ verbs:
+ - "*"
+- apiGroups:
+ - ""
+ resources:
+ - pods
+ - services
+ - endpoints
+ - persistentvolumeclaims
+ - events
+ - configmaps
+ - secrets
+ verbs:
+ - "*"
+- apiGroups:
+ - apps
+ resources:
+ - deployments
+ - replicasets
+ verbs:
+ - "*"
+- apiGroups:
+ - extensions
+ resources:
+ - deployments
+ - ingresses
+ verbs:
+ - "*"
+- apiGroups:
+ - flink.io
+ resources:
+ - flinkdeployments
+ - flinkdeployments/status
+ verbs:
+ - "*"
+- apiGroups:
+ - networking.k8s.io
+ resources:
+ - ingresses
+ verbs:
+ - "*"
+
+---
+
+apiVersion: v1
+kind: ClusterRoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: flink-operator-cluster-role-binding
+subjects:
+- kind: ServiceAccount
+ name: flink-operator
+ namespace: default
+roleRef:
+ kind: ClusterRole
+ name: flink-operator
+ apiGroup: rbac.authorization.k8s.io
diff --git a/examples/basic.yaml b/examples/basic.yaml
new file mode 100644
index 0000000..9fdc289
--- /dev/null
+++ b/examples/basic.yaml
@@ -0,0 +1,27 @@
+apiVersion: flink.io/v1alpha1
+kind: FlinkDeployment
+metadata:
+ namespace: default
+ name: basic-example
+spec:
+ image: flink:1.14.3
+ flinkVersion: 1.14.3
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "2"
+ kubernetes.jobmanager.service-account: flink-operator
+ kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
+ jobManager:
+ replicas: 1
+ resource:
+ memory: "2048m"
+ cpu: 1
+ taskManager:
+ taskSlots: 2
+ resource:
+ memory: "2048m"
+ cpu: 1
+ job:
+ jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ parallelism: 2
+ cancelMode: none
+ restoreMode: none
diff --git a/examples/cr.yaml b/examples/cr.yaml
deleted file mode 100644
index 9695be0..0000000
--- a/examples/cr.yaml
+++ /dev/null
@@ -1,21 +0,0 @@
-apiVersion: flink.io/v1alpha1
-kind: FlinkApplication
-metadata:
- namespace: default
- name: flink-example-statemachine
-spec:
- imageName: flink:latest
- jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
- parallelism: 1
- jobManagerResource:
- mem: 2048m
- cpu: 1
- taskManagerResource:
- mem: 2048m
- cpu: 1
- savepointsDir: file:///tmp/savepoints
- savepointGeneration: 0
- flinkConfig:
- taskmanager.numberOfTaskSlots: 2
- kubernetes.jobmanager.service-account: flink-operator
- kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
new file mode 100644
index 0000000..02fea81
--- /dev/null
+++ b/examples/pod-template.yaml
@@ -0,0 +1,49 @@
+apiVersion: flink.io/v1alpha1
+kind: FlinkDeployment
+metadata:
+ namespace: default
+ name: pod-template-example
+spec:
+ image: flink:1.14.3
+ flinkVersion: 1.14.3
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "2"
+ kubernetes.jobmanager.service-account: flink-operator
+ kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
+ podTemplate:
+ apiVersion: v1
+ kind: Pod
+ metadata:
+ name: pod-template
+ spec:
+ containers:
+ # Do not change the main container name
+ - name: flink-main-container
+ volumeMounts:
+ - mountPath: /opt/flink/log
+ name: flink-logs
+ # Sample sidecar container
+ - name: fluentbit
+ image: fluent/fluent-bit:1.8.12-debug
+ command: [ 'sh','-c','/fluent-bit/bin/fluent-bit -i tail -p path=/flink-logs/*.log -p multiline.parser=java -o stdout' ]
+ volumeMounts:
+ - mountPath: /flink-logs
+ name: flink-logs
+ volumes:
+ - name: flink-logs
+ emptyDir: { }
+ jobManager:
+ replicas: 1
+ resource:
+ memory: "2048m"
+ cpu: 1
+ taskManager:
+ taskSlots: 2
+ resource:
+ memory: "2048m"
+ cpu: 1
+ job:
+ jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ parallelism: 2
+ cancelMode: none
+ restoreMode: none
diff --git a/pom.xml b/pom.xml
index 029a069..647868a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,7 +123,7 @@
</artifactSet>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.flink.kubernetes.operator.KubernetesOperatorEntrypoint</mainClass>
+ <mainClass>org.apache.flink.kubernetes.operator.FlinkOperator</mainClass>
</transformer>
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
similarity index 81%
rename from src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java
rename to src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index b7f705f..edb5b74 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -5,7 +5,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
-import org.apache.flink.kubernetes.operator.controller.FlinkApplicationController;
+import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.takes.facets.fork.FkRegex;
@@ -18,8 +18,8 @@ import java.io.IOException;
/**
* Main Class for Flink native k8s operator.
*/
-public class KubernetesOperatorEntrypoint {
- private static final Logger LOG = LoggerFactory.getLogger(KubernetesOperatorEntrypoint.class);
+public class FlinkOperator {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
public static void main(String args[]) throws IOException {
@@ -33,7 +33,7 @@ public class KubernetesOperatorEntrypoint {
Operator operator = new Operator(client,
new ConfigurationServiceOverrider(DefaultConfigurationService.instance())
.build());
- operator.register(new FlinkApplicationController(client, namespace));
+ operator.register(new FlinkDeploymentController(client, namespace));
operator.installShutdownHook();
operator.start();
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java
index 4a50a64..4a69696 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java
@@ -1,9 +1,9 @@
package org.apache.flink.kubernetes.operator.Utils;
-import org.apache.flink.client.deployment.StandaloneClientFactory;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.internal.SerializationUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -14,86 +14,115 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkApplicationSpec;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.util.StringUtils;
+import java.io.File;
+import java.io.IOException;
import java.net.URI;
+import java.nio.file.Files;
import java.util.Collections;
public class FlinkUtils {
- public static Configuration getEffectiveConfig(String namespace, String clusterId, FlinkApplicationSpec spec) throws Exception {
- final String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
- final Configuration effectiveConfig;
- if (flinkConfDir != null) {
- effectiveConfig = GlobalConfiguration.loadConfiguration(flinkConfDir);
- } else {
- effectiveConfig = new Configuration();
- }
-
- // Basic config options
- final URI uri = new URI(spec.getJarURI());
- effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
- effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
- effectiveConfig.set(DeploymentOptions.TARGET, Constants.KUBERNETES_APP_TARGET);
- // Set rest service exposed type to clusterIP since we will use ingress to access the webui
- effectiveConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
-
- // Image
- if (!StringUtils.isNullOrWhitespaceOnly(spec.getImageName())) {
- effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImageName());
- }
- if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) {
- effectiveConfig.set(
- KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
- KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy()));
- }
-
- // Jars
- effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString()));
-
- // Parallelism and Resource
- if (spec.getParallelism() > 0) {
- effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, spec.getParallelism());
- }
- if (spec.getJobManagerResource() != null) {
- effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getJobManagerResource().getMem());
- effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, spec.getJobManagerResource().getCpu());
- }
- if (spec.getTaskManagerResource() != null) {
- effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getTaskManagerResource().getMem());
- effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, spec.getTaskManagerResource().getCpu());
- }
-
- // Savepoint
- if (!StringUtils.isNullOrWhitespaceOnly(spec.getFromSavepoint())) {
- effectiveConfig.setString(SavepointConfigOptions.SAVEPOINT_PATH, spec.getFromSavepoint());
- effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, spec.isAllowNonRestoredState());
- }
- if (!StringUtils.isNullOrWhitespaceOnly(spec.getSavepointsDir())) {
- effectiveConfig.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, spec.getSavepointsDir());
- }
-
- // Dynamic configuration
- if (spec.getFlinkConfig() != null && !spec.getFlinkConfig().isEmpty()) {
- spec.getFlinkConfig().forEach(effectiveConfig::setString);
- }
-
- return effectiveConfig;
- }
-
-
- public static ClusterClient<String> getRestClusterClient(Configuration config) throws Exception {
- final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
- final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
- final int port = config.getInteger(RestOptions.PORT);
- final String restServerAddress = String.format("http://%s-rest.%s:%s", clusterId, namespace, port);
- return new RestClusterClient<>(
- config,
- clusterId,
- (c,e) -> new StandaloneClientHAServices(restServerAddress));
- }
+ public static Configuration getEffectiveConfig(String namespace, String clusterId, FlinkDeploymentSpec spec) throws Exception {
+ final String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
+ final Configuration effectiveConfig;
+
+ if (flinkConfDir != null) {
+ effectiveConfig = GlobalConfiguration.loadConfiguration(flinkConfDir);
+ } else {
+ effectiveConfig = new Configuration();
+ }
+
+ effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
+ effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
+ effectiveConfig.set(DeploymentOptions.TARGET, Constants.KUBERNETES_APP_TARGET);
+
+ if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
+ effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImage());
+ }
+
+ if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) {
+ effectiveConfig.set(
+ KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
+ KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy()));
+ }
+
+ if (spec.getFlinkConfiguration() != null && !spec.getFlinkConfiguration().isEmpty()) {
+ spec.getFlinkConfiguration().forEach(effectiveConfig::setString);
+ }
+
+ // Pod template
+ if (spec.getPodTemplate() != null) {
+ effectiveConfig.set(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, createTempFile(spec.getPodTemplate()));
+ }
+
+ if (spec.getJobManager() != null) {
+ if (spec.getJobManager().getResource() != null) {
+ effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getJobManager().getResource().getMemory());
+ effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, spec.getJobManager().getResource().getCpu());
+ }
+
+ if (spec.getJobManager().getPodTemplate() != null) {
+ effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, createTempFile(spec.getJobManager().getPodTemplate()));
+ }
+ }
+
+ if (spec.getTaskManager() != null) {
+ if (spec.getTaskManager().getTaskSlots() > 0) {
+ effectiveConfig.set(TaskManagerOptions.NUM_TASK_SLOTS, spec.getTaskManager().getTaskSlots());
+ }
+
+ if (spec.getTaskManager().getResource() != null) {
+ effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getTaskManager().getResource().getMemory());
+ effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, spec.getTaskManager().getResource().getCpu());
+ }
+
+ if (spec.getTaskManager().getPodTemplate() != null) {
+ effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, createTempFile(spec.getTaskManager().getPodTemplate()));
+ }
+ }
+
+ if (spec.getJob() != null) {
+ final URI uri = new URI(spec.getJob().getJarURI());
+ effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString()));
+
+ if (spec.getJob().getParallelism() > 0) {
+ effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, spec.getJob().getParallelism());
+ }
+ }
+
+ return effectiveConfig;
+ }
+
+ private static String createTempFile(Pod podTemplate) throws IOException {
+ File tmp = File.createTempFile("podTemplate_", ".yaml");
+ Files.write(tmp.toPath(), SerializationUtils.dumpAsYaml(podTemplate).getBytes());
+ tmp.deleteOnExit();
+ return tmp.getAbsolutePath();
+ }
+
+ public static ClusterClient<String> getRestClusterClient(Configuration config) throws Exception {
+ final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
+ final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
+ final int port = config.getInteger(RestOptions.PORT);
+ final String restServerAddress = String.format("http://%s-rest.%s:%s", clusterId, namespace, port);
+ return new RestClusterClient<>(
+ config,
+ clusterId,
+ (c, e) -> new StandaloneClientHAServices(restServerAddress));
+ }
+
+ public static JobStatus convert(JobStatusMessage message) {
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobId(message.getJobId().toString());
+ jobStatus.setJobName(message.getJobName());
+ jobStatus.setState(message.getJobState().toString());
+ return jobStatus;
+ }
+
}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
deleted file mode 100644
index 54e47f0..0000000
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package org.apache.flink.kubernetes.operator.controller;
-
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder;
-import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
-import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
-import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
-import io.fabric8.kubernetes.client.KubernetesClient;
-
-import io.javaoperatorsdk.operator.api.reconciler.*;
-import io.javaoperatorsdk.operator.processing.event.source.EventSource;
-import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.cli.ApplicationDeployer;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.deployment.application.ApplicationConfiguration;
-import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.kubernetes.operator.Utils.FlinkUtils;
-import org.apache.flink.kubernetes.operator.Utils.Constants;
-import org.apache.flink.kubernetes.operator.Utils.KubernetesUtils;
-import org.apache.flink.kubernetes.operator.crd.FlinkApplication;
-import org.apache.flink.kubernetes.operator.crd.spec.Resource;
-import org.apache.flink.runtime.client.JobStatusMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.apache.flink.kubernetes.operator.Utils.Constants.FLINK_NATIVE_K8S_OPERATOR_NAME;
-
-@ControllerConfiguration
-public class FlinkApplicationController implements Reconciler<FlinkApplication>, ErrorStatusHandler<FlinkApplication>, EventSourceInitializer<FlinkApplication> {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkApplicationController.class);
- private static final int POLL_PERIOD = 3000;
-
- private final KubernetesClient kubernetesClient;
-
- private final Map<String, Tuple2<FlinkApplication, Configuration>> flinkApps;
- private final Map<String, String> savepointLocation;
-
- private final String operatorNamespace;
-
- public FlinkApplicationController(KubernetesClient kubernetesClient, String namespace) {
- this.kubernetesClient = kubernetesClient;
- this.operatorNamespace = namespace;
-
- this.flinkApps = new ConcurrentHashMap<>();
- this.savepointLocation = new HashMap<>();
- }
-
- @Override
- public DeleteControl cleanup(FlinkApplication flinkApp, Context context) {
- LOG.info("Cleaning up application {}", flinkApp);
- kubernetesClient.apps().deployments().inNamespace(flinkApp.getMetadata().getNamespace()).withName(flinkApp.getMetadata().getName()).cascading(true).delete();
- return DeleteControl.defaultDelete();
- }
-
- @Override
- public UpdateControl<FlinkApplication> reconcile(FlinkApplication flinkApp, Context context) {
- LOG.info("Reconciling application {}", flinkApp);
- final String namespace = flinkApp.getMetadata().getNamespace();
- final String clusterId = flinkApp.getMetadata().getName();
- final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(namespace).withName(clusterId).get();
-
- final Configuration effectiveConfig;
- try {
- effectiveConfig = FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec());
- } catch (Exception e) {
- LOG.error("Failed to load configuration", e);
- throw new RuntimeException("Failed to load configuration", e);
- }
-
- // Create new Flink application
- if (!flinkApps.containsKey(clusterId) && deployment == null) {
- // Deploy application
- final ClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
- final ApplicationDeployer deployer = new ApplicationClusterDeployer(clusterClientServiceLoader);
-
- final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(flinkApp.getSpec().getMainArgs(), flinkApp.getSpec().getEntryClass());
- try {
- deployer.run(effectiveConfig, applicationConfiguration);
- } catch (Exception e) {
- LOG.error("Failed to deploy cluster {}", clusterId, e);
- }
-
- flinkApps.put(clusterId, new Tuple2<>(flinkApp, effectiveConfig));
-
- updateIngress();
- } else {
- if (!flinkApps.containsKey(clusterId)) {
- LOG.info("Recovering {}", clusterId);
- flinkApps.put(clusterId, new Tuple2<>(flinkApp, effectiveConfig));
- return UpdateControl.noUpdate();
- }
- // Flink app is deleted externally
- if (deployment == null) {
- LOG.warn("{} is delete externally.", clusterId);
- flinkApps.remove(clusterId);
- return UpdateControl.noUpdate();
- }
-
- FlinkApplication oldFlinkApp = flinkApps.get(clusterId).f0;
-
- // Trigger a new savepoint
- triggerSavepoint(oldFlinkApp, flinkApp, effectiveConfig);
-
- // TODO support more fields updating, e.g. image, resources
- }
- return UpdateControl.updateResource(flinkApp);
- }
-
- @Override
- public List<EventSource> prepareEventSources(EventSourceContext<FlinkApplication> eventSourceContext) {
- // TODO: start status updated
-// return List.of(new PerResourcePollingEventSource<>(
-// new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD,
-// FlinkApplication.class));
- return Collections.emptyList();
- }
-
- @Override
- public Optional<FlinkApplication> updateErrorStatus(FlinkApplication flinkApplication, RetryInfo retryInfo, RuntimeException e) {
- //TODO: Set error status
- return Optional.empty();
- }
-
- private void updateIngress() {
- final List<IngressRule> ingressRules = new ArrayList<>();
- for (Tuple2<FlinkApplication, Configuration> entry : flinkApps.values()) {
- final FlinkApplication flinkApp = entry.f0;
- final String clusterId = flinkApp.getMetadata().getName();
- final int restPort = entry.f1.getInteger(RestOptions.PORT);
-
- final String ingressHost = clusterId + Constants.INGRESS_SUFFIX;
- ingressRules.add(new IngressRule(ingressHost, new HTTPIngressRuleValueBuilder().addNewPath().withNewBackend().withNewService().withName(clusterId + Constants.REST_SVC_NAME_SUFFIX).withNewPort(null, restPort).endService().endBackend().withPathType("Prefix").withPath("/").endPath().build()));
- }
- final Ingress ingress = new IngressBuilder().withApiVersion(Constants.INGRESS_API_VERSION).withNewMetadata().withName(FLINK_NATIVE_K8S_OPERATOR_NAME).endMetadata().withNewSpec().withRules(ingressRules).endSpec().build();
- // Get operator deploy
- final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(operatorNamespace).withName(FLINK_NATIVE_K8S_OPERATOR_NAME).get();
- if (deployment == null) {
- LOG.warn("Could not find deployment {}", FLINK_NATIVE_K8S_OPERATOR_NAME);
- } else {
- KubernetesUtils.setOwnerReference(deployment, Collections.singletonList(ingress));
- }
- kubernetesClient.resourceList(ingress).inNamespace(operatorNamespace).createOrReplace();
- }
-
- private void triggerSavepoint(FlinkApplication oldFlinkApp, FlinkApplication newFlinkApp, Configuration effectiveConfig) {
- final int generation = newFlinkApp.getSpec().getSavepointGeneration();
- if (generation > oldFlinkApp.getSpec().getSavepointGeneration()) {
- try (ClusterClient<String> clusterClient = FlinkUtils.getRestClusterClient(effectiveConfig)) {
- final CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs();
- jobDetailsFuture.get().forEach(status -> {
- LOG.debug("JobStatus for {}: {}", clusterClient.getClusterId(), status);
- clusterClient.triggerSavepoint(status.getJobId(), null).thenAccept(path -> savepointLocation.put(status.getJobId().toString(), path)).join();
- });
- } catch (Exception e) {
- LOG.warn("Failed to trigger a new savepoint with generation {}", generation);
- }
- }
- }
-}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
new file mode 100644
index 0000000..2967dc0
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -0,0 +1,123 @@
+package org.apache.flink.kubernetes.operator.controller;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import io.javaoperatorsdk.operator.api.reconciler.*;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.Utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+@ControllerConfiguration
+public class FlinkDeploymentController implements Reconciler<FlinkDeployment>, ErrorStatusHandler<FlinkDeployment>, EventSourceInitializer<FlinkDeployment> {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
+ private static final int POLL_PERIOD = 3000;
+
+ private final KubernetesClient kubernetesClient;
+
+ private final Map<String, String> savepointLocation;
+
+ private final String operatorNamespace;
+
+ public FlinkDeploymentController(KubernetesClient kubernetesClient, String namespace) {
+ this.kubernetesClient = kubernetesClient;
+ this.operatorNamespace = namespace;
+ this.savepointLocation = new HashMap<>();
+ }
+
+ @Override
+ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
+ LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName());
+ kubernetesClient.apps().deployments().inNamespace(flinkApp.getMetadata().getNamespace()).withName(
+ flinkApp.getMetadata().getName()).cascading(true).delete();
+ return DeleteControl.defaultDelete();
+ }
+
+ @Override
+ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
+ LOG.info("Reconciling application cluster {}", flinkApp.getMetadata().getName());
+ final String namespace = flinkApp.getMetadata().getNamespace();
+ final String clusterId = flinkApp.getMetadata().getName();
+ final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(namespace).withName(clusterId).get();
+
+ final Configuration effectiveConfig;
+ try {
+ effectiveConfig = FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec());
+ } catch (Exception e) {
+ LOG.error("Failed to load configuration", e);
+ throw new RuntimeException("Failed to load configuration", e);
+ }
+ if (deployment == null) {
+ LOG.info("Deploying application cluster {}", flinkApp.getMetadata().getName());
+ final ClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
+ final ApplicationDeployer deployer = new ApplicationClusterDeployer(clusterClientServiceLoader);
+
+ final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(
+ flinkApp.getSpec().getJob().getArgs(),
+ flinkApp.getSpec().getJob().getEntryClass());
+ try {
+ deployer.run(effectiveConfig, applicationConfiguration);
+ } catch (Exception e) {
+ LOG.error("Failed to deploy {}", clusterId, e);
+ }
+ LOG.info("{} deployed", flinkApp.getMetadata().getName());
+ return UpdateControl.<FlinkDeployment>noUpdate().rescheduleAfter(10, TimeUnit.SECONDS);
+ } else {
+ LOG.info("Getting job statuses for application cluster {}", flinkApp.getMetadata().getName());
+ FlinkDeploymentStatus flinkAppStatus = new FlinkDeploymentStatus();
+ try (ClusterClient<String> clusterClient = FlinkUtils.getRestClusterClient(effectiveConfig)) {
+ final CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs();
+ JobStatus[] jobStatuses = jobDetailsFuture.get().stream()
+ .map(FlinkUtils::convert)
+ .toArray(size -> new JobStatus[size]);
+ flinkAppStatus.setJobStatuses(jobStatuses);
+ flinkApp.setStatus(flinkAppStatus);
+ LOG.debug(flinkAppStatus.toString());
+ if (flinkApp.getStatus().getJobStatuses().length == 0) {
+ LOG.info("Got no job status for application cluster {} retrying", flinkApp.getMetadata().getName());
+ return UpdateControl.<FlinkDeployment>noUpdate().rescheduleAfter(10, TimeUnit.SECONDS);
+ } else {
+ LOG.info("Job statuses updated for application cluster {}", flinkApp.getMetadata().getName());
+ return UpdateControl.updateStatus(flinkApp);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to get the job statuses for application cluster {} giving up", flinkApp, e);
+ return UpdateControl.noUpdate();
+ }
+ }
+ }
+
+ @Override
+ public List<EventSource> prepareEventSources(EventSourceContext<FlinkDeployment> eventSourceContext) {
+ // TODO: start status updated
+// return List.of(new PerResourcePollingEventSource<>(
+// new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD,
+// FlinkApplication.class));
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Optional<FlinkDeployment> updateErrorStatus(FlinkDeployment flinkApp, RetryInfo retryInfo, RuntimeException e) {
+ LOG.warn("TODO: handle error status");
+ return Optional.empty();
+ }
+
+
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
similarity index 63%
rename from src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java
rename to src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
index 7b37869..2978588 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
@@ -6,12 +6,12 @@ import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Version;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkApplicationSpec;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkApplicationStatus;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonDeserialize()
@Group("flink.io")
@Version("v1alpha1")
-public class FlinkApplication extends CustomResource<FlinkApplicationSpec, FlinkApplicationStatus> implements Namespaced {
+public class FlinkDeployment extends CustomResource<FlinkDeploymentSpec, FlinkDeploymentStatus> implements Namespaced {
}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeploymentList.java
similarity index 57%
rename from src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java
rename to src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeploymentList.java
index 1f0115a..ef8a03c 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeploymentList.java
@@ -2,5 +2,5 @@ package org.apache.flink.kubernetes.operator.crd;
import io.fabric8.kubernetes.client.CustomResourceList;
-public class FlinkApplicationList extends CustomResourceList<FlinkApplication> {
+public class FlinkDeploymentList extends CustomResourceList<FlinkDeployment> {
}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/CancelMode.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/CancelMode.java
new file mode 100644
index 0000000..d58015e
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/CancelMode.java
@@ -0,0 +1,10 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum CancelMode {
+ @JsonProperty("savepoint")
+ SAVEPOINT,
+ @JsonProperty("none")
+ NONE
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java
deleted file mode 100644
index b5fc82c..0000000
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.flink.kubernetes.operator.crd.spec;
-
-import io.fabric8.kubernetes.api.model.KubernetesResource;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.util.Map;
-
-@Data
-@NoArgsConstructor
-public class FlinkApplicationSpec {
- private String imageName;
- private String imagePullPolicy;
-
- private String jarURI;
- private String[] mainArgs = new String[0];
- private String entryClass;
-
- private int parallelism;
-
- private Resource jobManagerResource;
- private Resource taskManagerResource;
-
- private String fromSavepoint;
- private boolean allowNonRestoredState = false;
- private String savepointsDir;
- private int savepointGeneration;
-
- private Map<String, String> flinkConfig;
-}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
new file mode 100644
index 0000000..f65e8e8
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -0,0 +1,22 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@NoArgsConstructor
+public class FlinkDeploymentSpec {
+ private String image;
+ private String imagePullPolicy;
+ private String flinkVersion;
+ private Map<String, String> flinkConfiguration;
+ private Pod podTemplate;
+ private JobManagerSpec jobManager;
+ private TaskManagerSpec taskManager;
+ private JobSpec job;
+ private Map<String, String> logging;
+}
+
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
new file mode 100644
index 0000000..c11e65f
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
@@ -0,0 +1,14 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class JobManagerSpec {
+ private Resource resource;
+ private int replicas;
+ private Pod podTemplate;
+}
+
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
new file mode 100644
index 0000000..dfd8adc
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
@@ -0,0 +1,16 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class JobSpec {
+ private String jarURI;
+ private int parallelism;
+ private String entryClass;
+ private String[] args = new String[0];
+ private RestoreMode restoreMode;
+ private CancelMode cancelMode;
+}
+
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
index e718860..7b190cc 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
@@ -5,8 +5,8 @@ import lombok.*;
@Data
@NoArgsConstructor
-public class Resource implements KubernetesResource {
+public class Resource {
private double cpu;
// 1024m, 1g
- private String mem;
+ private String memory;
}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/RestoreMode.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/RestoreMode.java
new file mode 100644
index 0000000..5365430
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/RestoreMode.java
@@ -0,0 +1,12 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum RestoreMode {
+ @JsonProperty("savepoint")
+ SAVEPOINT,
+ @JsonProperty("last-state")
+ LAST_STATE,
+ @JsonProperty("none")
+ NONE
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
new file mode 100644
index 0000000..a995098
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
@@ -0,0 +1,14 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class TaskManagerSpec {
+ private int taskSlots;
+ private Resource resource;
+ private Pod podTemplate;
+}
+
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
similarity index 81%
rename from src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java
rename to src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index 4b27f1b..88b1209 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -5,6 +5,6 @@ import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
-public class FlinkApplicationStatus {
+public class FlinkDeploymentStatus {
private JobStatus[] jobStatuses;
}
diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties
index 97e583e..369b39a 100644
--- a/src/main/resources/log4j2.properties
+++ b/src/main/resources/log4j2.properties
@@ -5,4 +5,4 @@ rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] %msg%n%throwable}