You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:33:55 UTC

[flink-kubernetes-operator] 02/23: CRD alignment + first working flink deployment

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit f3d12d6535b93a4002ba30fdb00573276d191fad
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Thu Jan 27 20:02:00 2022 +0100

    CRD alignment + first working flink deployment
---
 README.md                                          |  10 +-
 deploy/flink-operator.yaml                         |  82 +---------
 deploy/rbac.yaml                                   |  76 +++++++++
 examples/basic.yaml                                |  27 +++
 examples/cr.yaml                                   |  21 ---
 examples/pod-template.yaml                         |  49 ++++++
 pom.xml                                            |   2 +-
 ...sOperatorEntrypoint.java => FlinkOperator.java} |   8 +-
 .../kubernetes/operator/Utils/FlinkUtils.java      | 181 ++++++++++++---------
 .../controller/FlinkApplicationController.java     | 169 -------------------
 .../controller/FlinkDeploymentController.java      | 123 ++++++++++++++
 ...{FlinkApplication.java => FlinkDeployment.java} |   6 +-
 ...plicationList.java => FlinkDeploymentList.java} |   2 +-
 .../kubernetes/operator/crd/spec/CancelMode.java   |  10 ++
 .../operator/crd/spec/FlinkApplicationSpec.java    |  30 ----
 .../operator/crd/spec/FlinkDeploymentSpec.java     |  22 +++
 .../operator/crd/spec/JobManagerSpec.java          |  14 ++
 .../kubernetes/operator/crd/spec/JobSpec.java      |  16 ++
 .../kubernetes/operator/crd/spec/Resource.java     |   4 +-
 .../kubernetes/operator/crd/spec/RestoreMode.java  |  12 ++
 .../operator/crd/spec/TaskManagerSpec.java         |  14 ++
 ...ationStatus.java => FlinkDeploymentStatus.java} |   2 +-
 src/main/resources/log4j2.properties               |   2 +-
 23 files changed, 489 insertions(+), 393 deletions(-)

diff --git a/README.md b/README.md
index 845b97d..b0b58f9 100644
--- a/README.md
+++ b/README.md
@@ -7,17 +7,17 @@ mvn clean install
 ```
 
 ## How to Run
-* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. The CRD could be find [here](deploy/crd.yaml). If not, issue the following commands to apply:
+* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply:
 ```
-kubectl apply -f deploy/crd.yaml
+k apply -f target/classes/META-INF/fabric8/flinkapplications.flink.io-v1.yml
 ```
-* Build Docker Image
+* (Optional) Build Docker Image
 ```
 docker build . -t docker.apple.com/gyula_fora/flink-java-operator:latest
 ```
-* Start flink-operator deployment
-A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services.
+* Start flink-operator deployment. A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services.
 ```
+kubectl apply -f deploy/rbac.yaml
 kubectl apply -f deploy/flink-operator.yaml
 ```
 * Create a new Flink application
diff --git a/deploy/flink-operator.yaml b/deploy/flink-operator.yaml
index 564ed12..c2698c7 100644
--- a/deploy/flink-operator.yaml
+++ b/deploy/flink-operator.yaml
@@ -15,7 +15,7 @@ spec:
       serviceAccountName: flink-operator
       containers:
       - name: flink-operator
-        image: docker.apple.com/gyula_fora/flink-java-operator:latest
+        image: docker.apple.com/matyas_orhidi/flink-java-operator:latest
         imagePullPolicy: Always
         env:
           - name: FLINK_CONF_DIR
@@ -26,7 +26,7 @@ spec:
       volumes:
       - name: flink-config-volume
         configMap:
-          name: flink-config
+          name: flink-operator-config
           items:
           - key: flink-conf.yaml
             path: flink-conf.yaml
@@ -38,7 +38,7 @@ spec:
 apiVersion: v1
 kind: ConfigMap
 metadata:
-  name: flink-config
+  name: flink-operator-config
   labels:
     app: flink
 data:
@@ -97,79 +97,3 @@ data:
     # Suppress the irrelevant (wrong) warnings from the Netty channel handler
     logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
     logger.netty.level = OFF
-
----
-
-apiVersion: v1
-kind: ServiceAccount
-metadata:
-  name: flink-operator
-
----
-
-apiVersion: v1
-kind: ClusterRole
-apiVersion: rbac.authorization.k8s.io/v1
-metadata:
-  name: flink-operator
-rules:
-- apiGroups:
-  - flink-operator
-  resources:
-  - "*"
-  verbs:
-  - "*"
-- apiGroups:
-  - ""
-  resources:
-  - pods
-  - services
-  - endpoints
-  - persistentvolumeclaims
-  - events
-  - configmaps
-  - secrets
-  verbs:
-  - "*"
-- apiGroups:
-  - apps
-  resources:
-  - deployments
-  - replicasets
-  verbs:
-  - "*"
-- apiGroups:
-  - extensions
-  resources:
-  - deployments
-  - ingresses
-  verbs:
-  - "*"
-- apiGroups:
-  - flink.io
-  resources:
-  - flinkapplications
-  verbs:
-  - "*"
-- apiGroups:
-  - networking.k8s.io
-  resources:
-  - ingresses
-  verbs:
-  - "*"
-
----
-
-apiVersion: v1
-kind: ClusterRoleBinding
-apiVersion: rbac.authorization.k8s.io/v1
-metadata:
-  name: flink-operator-cluster-role-binding
-subjects:
-- kind: ServiceAccount
-  name: flink-operator
-  namespace: default
-roleRef:
-  kind: ClusterRole
-  name: flink-operator
-  apiGroup: rbac.authorization.k8s.io
diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml
new file mode 100644
index 0000000..062ecab
--- /dev/null
+++ b/deploy/rbac.yaml
@@ -0,0 +1,76 @@
+---
+
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  name: flink-operator
+
+---
+
+apiVersion: v1
+kind: ClusterRole
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: flink-operator
+rules:
+- apiGroups:
+  - flink-operator
+  resources:
+  - "*"
+  verbs:
+  - "*"
+- apiGroups:
+  - ""
+  resources:
+  - pods
+  - services
+  - endpoints
+  - persistentvolumeclaims
+  - events
+  - configmaps
+  - secrets
+  verbs:
+  - "*"
+- apiGroups:
+  - apps
+  resources:
+  - deployments
+  - replicasets
+  verbs:
+  - "*"
+- apiGroups:
+  - extensions
+  resources:
+  - deployments
+  - ingresses
+  verbs:
+  - "*"
+- apiGroups:
+  - flink.io
+  resources:
+  - flinkdeployments
+  - flinkdeployments/status
+  verbs:
+  - "*"
+- apiGroups:
+  - networking.k8s.io
+  resources:
+  - ingresses
+  verbs:
+  - "*"
+
+---
+
+apiVersion: v1
+kind: ClusterRoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: flink-operator-cluster-role-binding
+subjects:
+- kind: ServiceAccount
+  name: flink-operator
+  namespace: default
+roleRef:
+  kind: ClusterRole
+  name: flink-operator
+  apiGroup: rbac.authorization.k8s.io
diff --git a/examples/basic.yaml b/examples/basic.yaml
new file mode 100644
index 0000000..9fdc289
--- /dev/null
+++ b/examples/basic.yaml
@@ -0,0 +1,27 @@
+apiVersion: flink.io/v1alpha1
+kind: FlinkDeployment
+metadata:
+  namespace: default
+  name: basic-example
+spec:
+  image: flink:1.14.3
+  flinkVersion: 1.14.3
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "2"
+    kubernetes.jobmanager.service-account: flink-operator
+    kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
+  jobManager:
+    replicas: 1
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    taskSlots: 2
+    resource:
+      memory: "2048m"
+      cpu: 1
+  job:
+    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    parallelism: 2
+    cancelMode: none
+    restoreMode: none
diff --git a/examples/cr.yaml b/examples/cr.yaml
deleted file mode 100644
index 9695be0..0000000
--- a/examples/cr.yaml
+++ /dev/null
@@ -1,21 +0,0 @@
-apiVersion: flink.io/v1alpha1
-kind: FlinkApplication
-metadata:
-  namespace: default
-  name: flink-example-statemachine
-spec:
-  imageName: flink:latest
-  jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
-  parallelism: 1
-  jobManagerResource:
-    mem: 2048m
-    cpu: 1
-  taskManagerResource:
-    mem: 2048m
-    cpu: 1
-  savepointsDir: file:///tmp/savepoints
-  savepointGeneration: 0
-  flinkConfig:
-    taskmanager.numberOfTaskSlots: 2
-    kubernetes.jobmanager.service-account: flink-operator
-    kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
new file mode 100644
index 0000000..02fea81
--- /dev/null
+++ b/examples/pod-template.yaml
@@ -0,0 +1,49 @@
+apiVersion: flink.io/v1alpha1
+kind: FlinkDeployment
+metadata:
+  namespace: default
+  name: pod-template-example
+spec:
+  image: flink:1.14.3
+  flinkVersion: 1.14.3
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "2"
+    kubernetes.jobmanager.service-account: flink-operator
+    kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
+  podTemplate:
+    apiVersion: v1
+    kind: Pod
+    metadata:
+      name: pod-template
+    spec:
+      containers:
+        # Do not change the main container name
+        - name: flink-main-container
+          volumeMounts:
+            - mountPath: /opt/flink/log
+              name: flink-logs
+        # Sample sidecar container
+        - name: fluentbit
+          image: fluent/fluent-bit:1.8.12-debug
+          command: [ 'sh','-c','/fluent-bit/bin/fluent-bit -i tail -p path=/flink-logs/*.log -p multiline.parser=java -o stdout' ]
+          volumeMounts:
+            - mountPath: /flink-logs
+              name: flink-logs
+      volumes:
+        - name: flink-logs
+          emptyDir: { }
+  jobManager:
+    replicas: 1
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    taskSlots: 2
+    resource:
+      memory: "2048m"
+      cpu: 1
+  job:
+    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    parallelism: 2
+    cancelMode: none
+    restoreMode: none
diff --git a/pom.xml b/pom.xml
index 029a069..647868a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,7 +123,7 @@
                             </artifactSet>
                             <transformers combine.children="append">
                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>org.apache.flink.kubernetes.operator.KubernetesOperatorEntrypoint</mainClass>
+                                    <mainClass>org.apache.flink.kubernetes.operator.FlinkOperator</mainClass>
                                 </transformer>
                                 <!-- The service transformer is needed to merge META-INF/services files -->
                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
similarity index 81%
rename from src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java
rename to src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index b7f705f..edb5b74 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -5,7 +5,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
 import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
 import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
-import org.apache.flink.kubernetes.operator.controller.FlinkApplicationController;
+import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.takes.facets.fork.FkRegex;
@@ -18,8 +18,8 @@ import java.io.IOException;
 /**
  * Main Class for Flink native k8s operator.
  */
-public class KubernetesOperatorEntrypoint {
-    private static final Logger LOG = LoggerFactory.getLogger(KubernetesOperatorEntrypoint.class);
+public class FlinkOperator {
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
 
     public static void main(String args[]) throws IOException {
 
@@ -33,7 +33,7 @@ public class KubernetesOperatorEntrypoint {
         Operator operator = new Operator(client,
                 new ConfigurationServiceOverrider(DefaultConfigurationService.instance())
                         .build());
-        operator.register(new FlinkApplicationController(client, namespace));
+        operator.register(new FlinkDeploymentController(client, namespace));
         operator.installShutdownHook();
         operator.start();
 
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java
index 4a50a64..4a69696 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java
@@ -1,9 +1,9 @@
 package org.apache.flink.kubernetes.operator.Utils;
 
-import org.apache.flink.client.deployment.StandaloneClientFactory;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.internal.SerializationUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -14,86 +14,115 @@ import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkApplicationSpec;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.util.StringUtils;
 
+import java.io.File;
+import java.io.IOException;
 import java.net.URI;
+import java.nio.file.Files;
 import java.util.Collections;
 
 public class FlinkUtils {
 
-	public static Configuration getEffectiveConfig(String namespace, String clusterId, FlinkApplicationSpec spec) throws Exception {
-		final String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
-		final Configuration effectiveConfig;
-		if (flinkConfDir != null) {
-			effectiveConfig = GlobalConfiguration.loadConfiguration(flinkConfDir);
-		} else {
-			effectiveConfig = new Configuration();
-		}
-
-		// Basic config options
-		final URI uri = new URI(spec.getJarURI());
-		effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
-		effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
-		effectiveConfig.set(DeploymentOptions.TARGET, Constants.KUBERNETES_APP_TARGET);
-		// Set rest service exposed type to clusterIP since we will use ingress to access the webui
-		effectiveConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
-
-		// Image
-		if (!StringUtils.isNullOrWhitespaceOnly(spec.getImageName())) {
-			effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImageName());
-		}
-		if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) {
-			effectiveConfig.set(
-				KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
-				KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy()));
-		}
-
-		// Jars
-		effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString()));
-
-		// Parallelism and Resource
-		if (spec.getParallelism() > 0) {
-			effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, spec.getParallelism());
-		}
-		if (spec.getJobManagerResource() != null) {
-			effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getJobManagerResource().getMem());
-			effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, spec.getJobManagerResource().getCpu());
-		}
-		if (spec.getTaskManagerResource() != null) {
-			effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getTaskManagerResource().getMem());
-			effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, spec.getTaskManagerResource().getCpu());
-		}
-
-		// Savepoint
-		if (!StringUtils.isNullOrWhitespaceOnly(spec.getFromSavepoint())) {
-			effectiveConfig.setString(SavepointConfigOptions.SAVEPOINT_PATH, spec.getFromSavepoint());
-			effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, spec.isAllowNonRestoredState());
-		}
-		if (!StringUtils.isNullOrWhitespaceOnly(spec.getSavepointsDir())) {
-			effectiveConfig.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, spec.getSavepointsDir());
-		}
-
-		// Dynamic configuration
-		if (spec.getFlinkConfig() != null && !spec.getFlinkConfig().isEmpty()) {
-			spec.getFlinkConfig().forEach(effectiveConfig::setString);
-		}
-
-		return effectiveConfig;
-	}
-
-
-	public static ClusterClient<String> getRestClusterClient(Configuration config) throws Exception {
-		final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
-		final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
-		final int port = config.getInteger(RestOptions.PORT);
-		final String restServerAddress = String.format("http://%s-rest.%s:%s", clusterId, namespace, port);
-		return new RestClusterClient<>(
-			config,
-			clusterId,
-			(c,e) -> new StandaloneClientHAServices(restServerAddress));
-	}
+    public static Configuration getEffectiveConfig(String namespace, String clusterId, FlinkDeploymentSpec spec) throws Exception {
+        final String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
+        final Configuration effectiveConfig;
+
+        if (flinkConfDir != null) {
+            effectiveConfig = GlobalConfiguration.loadConfiguration(flinkConfDir);
+        } else {
+            effectiveConfig = new Configuration();
+        }
+
+        effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
+        effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
+        effectiveConfig.set(DeploymentOptions.TARGET, Constants.KUBERNETES_APP_TARGET);
+
+        if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
+            effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImage());
+        }
+
+        if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) {
+            effectiveConfig.set(
+                    KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
+                    KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy()));
+        }
+
+        if (spec.getFlinkConfiguration() != null && !spec.getFlinkConfiguration().isEmpty()) {
+            spec.getFlinkConfiguration().forEach(effectiveConfig::setString);
+        }
+
+        // Pod template
+        if (spec.getPodTemplate() != null) {
+            effectiveConfig.set(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, createTempFile(spec.getPodTemplate()));
+        }
+
+        if (spec.getJobManager() != null) {
+            if (spec.getJobManager().getResource() != null) {
+                effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getJobManager().getResource().getMemory());
+                effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, spec.getJobManager().getResource().getCpu());
+            }
+
+            if (spec.getJobManager().getPodTemplate() != null) {
+                effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, createTempFile(spec.getJobManager().getPodTemplate()));
+            }
+        }
+
+        if (spec.getTaskManager() != null) {
+            if (spec.getTaskManager().getTaskSlots() > 0) {
+                effectiveConfig.set(TaskManagerOptions.NUM_TASK_SLOTS, spec.getTaskManager().getTaskSlots());
+            }
+
+            if (spec.getTaskManager().getResource() != null) {
+                effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getTaskManager().getResource().getMemory());
+                effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, spec.getTaskManager().getResource().getCpu());
+            }
+
+            if (spec.getTaskManager().getPodTemplate() != null) {
+                effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, createTempFile(spec.getTaskManager().getPodTemplate()));
+            }
+        }
+
+        if (spec.getJob() != null) {
+            final URI uri = new URI(spec.getJob().getJarURI());
+            effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString()));
+
+            if (spec.getJob().getParallelism() > 0) {
+                effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, spec.getJob().getParallelism());
+            }
+        }
+
+        return effectiveConfig;
+    }
+
+    private static String createTempFile(Pod podTemplate) throws IOException {
+        File tmp = File.createTempFile("podTemplate_", ".yaml");
+        Files.write(tmp.toPath(), SerializationUtils.dumpAsYaml(podTemplate).getBytes());
+        tmp.deleteOnExit();
+        return tmp.getAbsolutePath();
+    }
+
+    public static ClusterClient<String> getRestClusterClient(Configuration config) throws Exception {
+        final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
+        final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
+        final int port = config.getInteger(RestOptions.PORT);
+        final String restServerAddress = String.format("http://%s-rest.%s:%s", clusterId, namespace, port);
+        return new RestClusterClient<>(
+                config,
+                clusterId,
+                (c, e) -> new StandaloneClientHAServices(restServerAddress));
+    }
+
+    public static JobStatus convert(JobStatusMessage message) {
+        JobStatus jobStatus = new JobStatus();
+        jobStatus.setJobId(message.getJobId().toString());
+        jobStatus.setJobName(message.getJobName());
+        jobStatus.setState(message.getJobState().toString());
+        return jobStatus;
+    }
+
 }
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
deleted file mode 100644
index 54e47f0..0000000
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package org.apache.flink.kubernetes.operator.controller;
-
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder;
-import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
-import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
-import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
-import io.fabric8.kubernetes.client.KubernetesClient;
-
-import io.javaoperatorsdk.operator.api.reconciler.*;
-import io.javaoperatorsdk.operator.processing.event.source.EventSource;
-import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.cli.ApplicationDeployer;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.deployment.application.ApplicationConfiguration;
-import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.kubernetes.operator.Utils.FlinkUtils;
-import org.apache.flink.kubernetes.operator.Utils.Constants;
-import org.apache.flink.kubernetes.operator.Utils.KubernetesUtils;
-import org.apache.flink.kubernetes.operator.crd.FlinkApplication;
-import org.apache.flink.kubernetes.operator.crd.spec.Resource;
-import org.apache.flink.runtime.client.JobStatusMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.apache.flink.kubernetes.operator.Utils.Constants.FLINK_NATIVE_K8S_OPERATOR_NAME;
-
-@ControllerConfiguration
-public class FlinkApplicationController implements Reconciler<FlinkApplication>, ErrorStatusHandler<FlinkApplication>, EventSourceInitializer<FlinkApplication> {
-    private static final Logger LOG = LoggerFactory.getLogger(FlinkApplicationController.class);
-    private static final int POLL_PERIOD = 3000;
-
-    private final KubernetesClient kubernetesClient;
-
-    private final Map<String, Tuple2<FlinkApplication, Configuration>> flinkApps;
-    private final Map<String, String> savepointLocation;
-
-    private final String operatorNamespace;
-
-    public FlinkApplicationController(KubernetesClient kubernetesClient, String namespace) {
-        this.kubernetesClient = kubernetesClient;
-        this.operatorNamespace = namespace;
-
-        this.flinkApps = new ConcurrentHashMap<>();
-        this.savepointLocation = new HashMap<>();
-    }
-
-    @Override
-    public DeleteControl cleanup(FlinkApplication flinkApp, Context context) {
-        LOG.info("Cleaning up application {}", flinkApp);
-        kubernetesClient.apps().deployments().inNamespace(flinkApp.getMetadata().getNamespace()).withName(flinkApp.getMetadata().getName()).cascading(true).delete();
-        return DeleteControl.defaultDelete();
-    }
-
-    @Override
-    public UpdateControl<FlinkApplication> reconcile(FlinkApplication flinkApp, Context context) {
-        LOG.info("Reconciling application {}", flinkApp);
-        final String namespace = flinkApp.getMetadata().getNamespace();
-        final String clusterId = flinkApp.getMetadata().getName();
-        final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(namespace).withName(clusterId).get();
-
-        final Configuration effectiveConfig;
-        try {
-            effectiveConfig = FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec());
-        } catch (Exception e) {
-            LOG.error("Failed to load configuration", e);
-            throw new RuntimeException("Failed to load configuration", e);
-        }
-
-        // Create new Flink application
-        if (!flinkApps.containsKey(clusterId) && deployment == null) {
-            // Deploy application
-            final ClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
-            final ApplicationDeployer deployer = new ApplicationClusterDeployer(clusterClientServiceLoader);
-
-            final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(flinkApp.getSpec().getMainArgs(), flinkApp.getSpec().getEntryClass());
-            try {
-                deployer.run(effectiveConfig, applicationConfiguration);
-            } catch (Exception e) {
-                LOG.error("Failed to deploy cluster {}", clusterId, e);
-            }
-
-            flinkApps.put(clusterId, new Tuple2<>(flinkApp, effectiveConfig));
-
-            updateIngress();
-        } else {
-            if (!flinkApps.containsKey(clusterId)) {
-                LOG.info("Recovering {}", clusterId);
-                flinkApps.put(clusterId, new Tuple2<>(flinkApp, effectiveConfig));
-                return UpdateControl.noUpdate();
-            }
-            // Flink app is deleted externally
-            if (deployment == null) {
-                LOG.warn("{} is delete externally.", clusterId);
-                flinkApps.remove(clusterId);
-                return UpdateControl.noUpdate();
-            }
-
-            FlinkApplication oldFlinkApp = flinkApps.get(clusterId).f0;
-
-            // Trigger a new savepoint
-            triggerSavepoint(oldFlinkApp, flinkApp, effectiveConfig);
-
-            // TODO support more fields updating, e.g. image, resources
-        }
-        return UpdateControl.updateResource(flinkApp);
-    }
-
-    @Override
-    public List<EventSource> prepareEventSources(EventSourceContext<FlinkApplication> eventSourceContext) {
-        // TODO: start status updated
-//        return List.of(new PerResourcePollingEventSource<>(
-//                new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD,
-//                FlinkApplication.class));
-        return Collections.emptyList();
-    }
-
-    @Override
-    public Optional<FlinkApplication> updateErrorStatus(FlinkApplication flinkApplication, RetryInfo retryInfo, RuntimeException e) {
-        //TODO: Set error status
-        return Optional.empty();
-    }
-
-    private void updateIngress() {
-        final List<IngressRule> ingressRules = new ArrayList<>();
-        for (Tuple2<FlinkApplication, Configuration> entry : flinkApps.values()) {
-            final FlinkApplication flinkApp = entry.f0;
-            final String clusterId = flinkApp.getMetadata().getName();
-            final int restPort = entry.f1.getInteger(RestOptions.PORT);
-
-            final String ingressHost = clusterId + Constants.INGRESS_SUFFIX;
-            ingressRules.add(new IngressRule(ingressHost, new HTTPIngressRuleValueBuilder().addNewPath().withNewBackend().withNewService().withName(clusterId + Constants.REST_SVC_NAME_SUFFIX).withNewPort(null, restPort).endService().endBackend().withPathType("Prefix").withPath("/").endPath().build()));
-        }
-        final Ingress ingress = new IngressBuilder().withApiVersion(Constants.INGRESS_API_VERSION).withNewMetadata().withName(FLINK_NATIVE_K8S_OPERATOR_NAME).endMetadata().withNewSpec().withRules(ingressRules).endSpec().build();
-        // Get operator deploy
-        final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(operatorNamespace).withName(FLINK_NATIVE_K8S_OPERATOR_NAME).get();
-        if (deployment == null) {
-            LOG.warn("Could not find deployment {}", FLINK_NATIVE_K8S_OPERATOR_NAME);
-        } else {
-            KubernetesUtils.setOwnerReference(deployment, Collections.singletonList(ingress));
-        }
-        kubernetesClient.resourceList(ingress).inNamespace(operatorNamespace).createOrReplace();
-    }
-
-    private void triggerSavepoint(FlinkApplication oldFlinkApp, FlinkApplication newFlinkApp, Configuration effectiveConfig) {
-        final int generation = newFlinkApp.getSpec().getSavepointGeneration();
-        if (generation > oldFlinkApp.getSpec().getSavepointGeneration()) {
-            try (ClusterClient<String> clusterClient = FlinkUtils.getRestClusterClient(effectiveConfig)) {
-                final CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs();
-                jobDetailsFuture.get().forEach(status -> {
-                    LOG.debug("JobStatus for {}: {}", clusterClient.getClusterId(), status);
-                    clusterClient.triggerSavepoint(status.getJobId(), null).thenAccept(path -> savepointLocation.put(status.getJobId().toString(), path)).join();
-                });
-            } catch (Exception e) {
-                LOG.warn("Failed to trigger a new savepoint with generation {}", generation);
-            }
-        }
-    }
-}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
new file mode 100644
index 0000000..2967dc0
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -0,0 +1,123 @@
+package org.apache.flink.kubernetes.operator.controller;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import io.javaoperatorsdk.operator.api.reconciler.*;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.Utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+@ControllerConfiguration
+public class FlinkDeploymentController implements Reconciler<FlinkDeployment>, ErrorStatusHandler<FlinkDeployment>, EventSourceInitializer<FlinkDeployment> {
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
+    private static final int POLL_PERIOD = 3000;
+
+    private final KubernetesClient kubernetesClient;
+
+    private final Map<String, String> savepointLocation;
+
+    private final String operatorNamespace;
+
+    public FlinkDeploymentController(KubernetesClient kubernetesClient, String namespace) {
+        this.kubernetesClient = kubernetesClient;
+        this.operatorNamespace = namespace;
+        this.savepointLocation = new HashMap<>();
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
+        LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName());
+        kubernetesClient.apps().deployments().inNamespace(flinkApp.getMetadata().getNamespace()).withName(
+                flinkApp.getMetadata().getName()).cascading(true).delete();
+        return DeleteControl.defaultDelete();
+    }
+
+    @Override
+    public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
+        LOG.info("Reconciling application cluster {}", flinkApp.getMetadata().getName());
+        final String namespace = flinkApp.getMetadata().getNamespace();
+        final String clusterId = flinkApp.getMetadata().getName();
+        final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(namespace).withName(clusterId).get();
+
+        final Configuration effectiveConfig;
+        try {
+            effectiveConfig = FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec());
+        } catch (Exception e) {
+            LOG.error("Failed to load configuration", e);
+            throw new RuntimeException("Failed to load configuration", e);
+        }
+        if (deployment == null) {
+            LOG.info("Deploying application cluster {}", flinkApp.getMetadata().getName());
+            final ClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
+            final ApplicationDeployer deployer = new ApplicationClusterDeployer(clusterClientServiceLoader);
+
+            final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(
+                    flinkApp.getSpec().getJob().getArgs(),
+                    flinkApp.getSpec().getJob().getEntryClass());
+            try {
+                deployer.run(effectiveConfig, applicationConfiguration);
+            } catch (Exception e) {
+                LOG.error("Failed to deploy {}", clusterId, e);
+            }
+            LOG.info("{} deployed", flinkApp.getMetadata().getName());
+            return UpdateControl.<FlinkDeployment>noUpdate().rescheduleAfter(10, TimeUnit.SECONDS);
+        } else {
+            LOG.info("Getting job statuses for application cluster {}", flinkApp.getMetadata().getName());
+            FlinkDeploymentStatus flinkAppStatus = new FlinkDeploymentStatus();
+            try (ClusterClient<String> clusterClient = FlinkUtils.getRestClusterClient(effectiveConfig)) {
+                final CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs();
+                JobStatus[] jobStatuses = jobDetailsFuture.get().stream()
+                        .map(FlinkUtils::convert)
+                        .toArray(size -> new JobStatus[size]);
+                flinkAppStatus.setJobStatuses(jobStatuses);
+                flinkApp.setStatus(flinkAppStatus);
+                LOG.debug(flinkAppStatus.toString());
+                if (flinkApp.getStatus().getJobStatuses().length == 0) {
+                    LOG.info("Got no job status for application cluster {} retrying", flinkApp.getMetadata().getName());
+                    return UpdateControl.<FlinkDeployment>noUpdate().rescheduleAfter(10, TimeUnit.SECONDS);
+                } else {
+                    LOG.info("Job statuses updated for application cluster {}", flinkApp.getMetadata().getName());
+                    return UpdateControl.updateStatus(flinkApp);
+                }
+            } catch (Exception e) {
+                LOG.warn("Failed to get the job statuses for application cluster {} giving up", flinkApp, e);
+                return UpdateControl.noUpdate();
+            }
+        }
+    }
+
+    @Override
+    public List<EventSource> prepareEventSources(EventSourceContext<FlinkDeployment> eventSourceContext) {
+        // TODO: start status updated
+//        return List.of(new PerResourcePollingEventSource<>(
+//                new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD,
+//                FlinkApplication.class));
+        return Collections.emptyList();
+    }
+
+    @Override
+    public Optional<FlinkDeployment> updateErrorStatus(FlinkDeployment flinkApp, RetryInfo retryInfo, RuntimeException e) {
+        LOG.warn("TODO: handle error status");
+        return Optional.empty();
+    }
+
+
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
similarity index 63%
rename from src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java
rename to src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
index 7b37869..2978588 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
@@ -6,12 +6,12 @@ import io.fabric8.kubernetes.api.model.Namespaced;
 import io.fabric8.kubernetes.client.CustomResource;
 import io.fabric8.kubernetes.model.annotation.Group;
 import io.fabric8.kubernetes.model.annotation.Version;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkApplicationSpec;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkApplicationStatus;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 
 @JsonInclude(JsonInclude.Include.NON_NULL)
 @JsonDeserialize()
 @Group("flink.io")
 @Version("v1alpha1")
-public class FlinkApplication extends CustomResource<FlinkApplicationSpec, FlinkApplicationStatus> implements Namespaced {
+public class FlinkDeployment extends CustomResource<FlinkDeploymentSpec, FlinkDeploymentStatus> implements Namespaced {
 }
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeploymentList.java
similarity index 57%
rename from src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java
rename to src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeploymentList.java
index 1f0115a..ef8a03c 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeploymentList.java
@@ -2,5 +2,5 @@ package org.apache.flink.kubernetes.operator.crd;
 
 import io.fabric8.kubernetes.client.CustomResourceList;
 
-public class FlinkApplicationList extends CustomResourceList<FlinkApplication> {
+public class FlinkDeploymentList extends CustomResourceList<FlinkDeployment> {
 }
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/CancelMode.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/CancelMode.java
new file mode 100644
index 0000000..d58015e
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/CancelMode.java
@@ -0,0 +1,10 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum CancelMode {
+    @JsonProperty("savepoint")
+    SAVEPOINT,
+    @JsonProperty("none")
+    NONE
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java
deleted file mode 100644
index b5fc82c..0000000
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.flink.kubernetes.operator.crd.spec;
-
-import io.fabric8.kubernetes.api.model.KubernetesResource;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.util.Map;
-
-@Data
-@NoArgsConstructor
-public class FlinkApplicationSpec {
-    private String imageName;
-    private String imagePullPolicy;
-
-    private String jarURI;
-    private String[] mainArgs = new String[0];
-    private String entryClass;
-
-    private int parallelism;
-
-    private Resource jobManagerResource;
-    private Resource taskManagerResource;
-
-    private String fromSavepoint;
-    private boolean allowNonRestoredState = false;
-    private String savepointsDir;
-    private int savepointGeneration;
-
-    private Map<String, String> flinkConfig;
-}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
new file mode 100644
index 0000000..f65e8e8
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -0,0 +1,22 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@NoArgsConstructor
+public class FlinkDeploymentSpec {
+    private String image;
+    private String imagePullPolicy;
+    private String flinkVersion;
+    private Map<String, String> flinkConfiguration;
+    private Pod podTemplate;
+    private JobManagerSpec jobManager;
+    private TaskManagerSpec taskManager;
+    private JobSpec job;
+    private Map<String, String> logging;
+}
+
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
new file mode 100644
index 0000000..c11e65f
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
@@ -0,0 +1,14 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class JobManagerSpec {
+    private Resource resource;
+    private int replicas;
+    private Pod podTemplate;
+}
+
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
new file mode 100644
index 0000000..dfd8adc
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
@@ -0,0 +1,16 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class JobSpec {
+    private String jarURI;
+    private int parallelism;
+    private String entryClass;
+    private String[] args = new String[0];
+    private RestoreMode restoreMode;
+    private CancelMode cancelMode;
+}
+
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
index e718860..7b190cc 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
@@ -5,8 +5,8 @@ import lombok.*;
 
 @Data
 @NoArgsConstructor
-public class Resource implements KubernetesResource {
+public class Resource {
     private double cpu;
     // 1024m, 1g
-    private String mem;
+    private String memory;
 }
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/RestoreMode.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/RestoreMode.java
new file mode 100644
index 0000000..5365430
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/RestoreMode.java
@@ -0,0 +1,12 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum RestoreMode {
+    @JsonProperty("savepoint")
+    SAVEPOINT,
+    @JsonProperty("last-state")
+    LAST_STATE,
+    @JsonProperty("none")
+    NONE
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
new file mode 100644
index 0000000..a995098
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
@@ -0,0 +1,14 @@
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class TaskManagerSpec {
+    private int taskSlots;
+    private Resource resource;
+    private Pod podTemplate;
+}
+
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
similarity index 81%
rename from src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java
rename to src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index 4b27f1b..88b1209 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -5,6 +5,6 @@ import lombok.NoArgsConstructor;
 
 @Data
 @NoArgsConstructor
-public class FlinkApplicationStatus  {
+public class FlinkDeploymentStatus {
     private JobStatus[] jobStatuses;
 }
diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties
index 97e583e..369b39a 100644
--- a/src/main/resources/log4j2.properties
+++ b/src/main/resources/log4j2.properties
@@ -5,4 +5,4 @@ rootLogger.appenderRef.console.ref = ConsoleAppender
 appender.console.name = ConsoleAppender
 appender.console.type = CONSOLE
 appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] %msg%n%throwable}