You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/26 09:01:15 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28646] Handle scaling operation separately in reconciler/service

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a346ec7 [FLINK-28646] Handle scaling operation separately in reconciler/service
0a346ec7 is described below

commit 0a346ec7c6e8b770c1c10aa01686ee68c9dd4f27
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Thu Jul 28 18:06:51 2022 +0200

    [FLINK-28646] Handle scaling operation separately in reconciler/service
---
 examples/basic-reactive.yaml                       |  62 ++++
 .../operator/crd/spec/AbstractFlinkSpec.java       |  17 +-
 .../operator/crd/spec/JobManagerSpec.java          |   2 +
 .../kubernetes/operator/crd/spec/JobSpec.java      |  26 +-
 .../operator/crd/spec/TaskManagerSpec.java         |   2 +
 .../operator/reconciler/ReconciliationUtils.java   |   2 +-
 .../AbstractFlinkResourceReconciler.java           |  30 +-
 .../deployment/AbstractJobReconciler.java          |  22 +-
 .../reconciler/deployment/SessionReconciler.java   |   4 +-
 .../diff/Diff.java}                                |  33 +-
 .../operator/reconciler/diff/DiffBuilder.java      | 351 +++++++++++++++++++++
 .../operator/reconciler/diff/DiffResult.java       |  84 +++++
 .../diff/DiffType.java}                            |  29 +-
 .../diff/Diffable.java}                            |  29 +-
 .../reconciler/diff/ReflectiveDiffBuilder.java     | 149 +++++++++
 .../diff/SpecDiff.java}                            |  40 ++-
 .../kubernetes/operator/service/FlinkService.java  |   4 +
 .../operator/service/StandaloneFlinkService.java   |  38 +++
 .../flink/kubernetes/operator/TestUtils.java       |   5 +
 .../kubernetes/operator/TestingFlinkService.java   |  17 +
 .../deployment/ApplicationReconcilerTest.java      |  80 ++++-
 .../operator/reconciler/diff/SpecDiffTest.java     | 170 ++++++++++
 .../service/StandaloneFlinkServiceTest.java        | 105 +++++-
 .../kubernetes/operator/utils/FlinkUtilsTest.java  |   2 +-
 24 files changed, 1203 insertions(+), 100 deletions(-)

diff --git a/examples/basic-reactive.yaml b/examples/basic-reactive.yaml
new file mode 100644
index 00000000..eddf58b3
--- /dev/null
+++ b/examples/basic-reactive.yaml
@@ -0,0 +1,62 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+  name: basic-reactive-example
+spec:
+  image: flink:1.15
+  flinkVersion: v1_15
+  flinkConfiguration:
+    scheduler-mode: REACTIVE
+    taskmanager.numberOfTaskSlots: "2"
+    state.savepoints.dir: file:///flink-data/savepoints
+    state.checkpoints.dir: file:///flink-data/checkpoints
+    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: file:///flink-data/ha
+  serviceAccount: flink
+  jobManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  podTemplate:
+    spec:
+      containers:
+        - name: flink-main-container
+          volumeMounts:
+          - mountPath: /flink-data
+            name: flink-volume
+      volumes:
+      - name: flink-volume
+        hostPath:
+          # directory location on host
+          path: /tmp/flink
+          # this field is optional
+          type: Directory
+  job:
+    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    parallelism: 2
+    upgradeMode: savepoint
+    state: running
+    savepointTriggerNonce: 0
+  mode: standalone
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java
index 8407d904..d7d1e2a5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java
@@ -18,6 +18,11 @@
 package org.apache.flink.kubernetes.operator.crd.spec;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffType;
+import org.apache.flink.kubernetes.operator.reconciler.diff.Diffable;
+import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
+import org.apache.flink.kubernetes.operator.reconciler.diff.SpecDiff;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -32,7 +37,7 @@ import java.util.Map;
 @AllArgsConstructor
 @NoArgsConstructor
 @SuperBuilder
-public abstract class AbstractFlinkSpec {
+public abstract class AbstractFlinkSpec implements Diffable<AbstractFlinkSpec> {
 
     /** Job specification for application deployments/session job. Null for session clusters. */
     private JobSpec job;
@@ -44,5 +49,15 @@ public abstract class AbstractFlinkSpec {
     private Long restartNonce;
 
     /** Flink configuration overrides for the Flink deployment or Flink session job. */
+    @SpecDiff.Config({
+        @SpecDiff.Entry(prefix = "parallelism.default", type = DiffType.IGNORE),
+        @SpecDiff.Entry(prefix = "kubernetes.operator", type = DiffType.IGNORE),
+        @SpecDiff.Entry(prefix = "metrics.scope.k8soperator", type = DiffType.IGNORE)
+    })
     private Map<String, String> flinkConfiguration;
+
+    @Override
+    public DiffResult<AbstractFlinkSpec> diff(AbstractFlinkSpec spec) {
+        return new ReflectiveDiffBuilder<>(this, spec).build();
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
index d32ab0e2..634e7016 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Experimental;
 
 import io.fabric8.kubernetes.api.model.Pod;
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
@@ -29,6 +30,7 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
+@Builder
 public class JobManagerSpec {
     /** Resource specification for the JobManager pods. */
     private Resource resource;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
index a6574994..3583a5e3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
@@ -18,6 +18,11 @@
 package org.apache.flink.kubernetes.operator.crd.spec;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffType;
+import org.apache.flink.kubernetes.operator.reconciler.diff.Diffable;
+import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
+import org.apache.flink.kubernetes.operator.reconciler.diff.SpecDiff;
 
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -31,7 +36,8 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @AllArgsConstructor
 @Builder
-public class JobSpec {
+@EqualsAndHashCode
+public class JobSpec implements Diffable<JobSpec> {
 
     /**
      * URI of the job jar within the Flink docker container. For example: Example:
@@ -40,6 +46,7 @@ public class JobSpec {
     private String jarURI;
 
     /** Parallelism of the Flink job. */
+    @SpecDiff(DiffType.SCALE)
     private int parallelism;
 
     /** Fully qualified main class name of the Flink job. */
@@ -55,17 +62,26 @@ public class JobSpec {
      * Nonce used to manually trigger savepoint for the running job. In order to trigger a
      * savepoint, change the number to anything other than the current value.
      */
-    @EqualsAndHashCode.Exclude private Long savepointTriggerNonce;
+    @SpecDiff(DiffType.IGNORE)
+    private Long savepointTriggerNonce;
 
     /**
      * Savepoint path used by the job the first time it is deployed. Upgrades/redeployments will not
      * be affected.
      */
-    @EqualsAndHashCode.Exclude private String initialSavepointPath;
+    @SpecDiff(DiffType.IGNORE)
+    private String initialSavepointPath;
 
     /** Upgrade mode of the Flink job. */
-    @EqualsAndHashCode.Exclude private UpgradeMode upgradeMode = UpgradeMode.STATELESS;
+    @SpecDiff(DiffType.IGNORE)
+    private UpgradeMode upgradeMode = UpgradeMode.STATELESS;
 
     /** Allow checkpoint state that cannot be mapped to any job vertex in tasks. */
-    @EqualsAndHashCode.Exclude private Boolean allowNonRestoredState;
+    @SpecDiff(DiffType.IGNORE)
+    private Boolean allowNonRestoredState;
+
+    @Override
+    public DiffResult<JobSpec> diff(JobSpec spec) {
+        return new ReflectiveDiffBuilder<>(this, spec).build();
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
index 400a2cc6..860ecd91 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.model.annotation.SpecReplicas;
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
@@ -30,6 +31,7 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
+@Builder
 public class TaskManagerSpec {
     /** Resource specification for the TaskManager pods. */
     private Resource resource;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index dc6bd6c2..389e59d6 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -265,8 +265,8 @@ public class ReconciliationUtils {
      *
      * @param specWithMetaString JSON string.
      * @param specClass Spec class for deserialization.
-     * @return Tuple2 of spec and meta.
      * @param <T> Spec type.
+     * @return Tuple2 of spec and meta.
      */
     public static <T extends AbstractFlinkSpec>
             Tuple2<T, ReconciliationMetadata> deserializeSpecWithMeta(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index c51ba32c..436f96ec 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffType;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -67,7 +68,8 @@ public abstract class AbstractFlinkResourceReconciler<
     protected final KubernetesClient kubernetesClient;
 
     public static final String MSG_SUSPENDED = "Suspending existing deployment.";
-    public static final String MSG_SPEC_CHANGED = "Detected spec change, starting reconciliation.";
+    public static final String MSG_SPEC_CHANGED =
+            "%s change(s) detected (%s), starting reconciliation.";
     public static final String MSG_ROLLBACK = "Rolling back failed deployment.";
     public static final String MSG_SUBMIT = "Starting deployment";
 
@@ -122,25 +124,33 @@ public abstract class AbstractFlinkResourceReconciler<
                 cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
         SPEC currentDeploySpec = cr.getSpec();
 
+        var specDiff = currentDeploySpec.diff(lastReconciledSpec);
+
+        var flinkService = getFlinkService(cr, ctx);
+
         boolean specChanged =
-                reconciliationStatus.getState() == ReconciliationState.UPGRADING
-                        || !currentDeploySpec.equals(lastReconciledSpec);
+                DiffType.IGNORE != specDiff.getType()
+                        || reconciliationStatus.getState() == ReconciliationState.UPGRADING;
+
         var observeConfig = getObserveConfig(cr, ctx);
-        var flinkService = getFlinkService(cr, ctx);
+
         if (specChanged) {
+
             if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
                 return;
             }
-            LOG.info(MSG_SPEC_CHANGED);
+
+            var specChangeMessage = String.format(MSG_SPEC_CHANGED, specDiff.getType(), specDiff);
+            LOG.info(specChangeMessage);
             if (reconciliationStatus.getState() != ReconciliationState.UPGRADING) {
                 eventRecorder.triggerEvent(
                         cr,
                         EventRecorder.Type.Normal,
                         EventRecorder.Reason.SpecChanged,
                         EventRecorder.Component.JobManagerDeployment,
-                        MSG_SPEC_CHANGED);
+                        specChangeMessage);
             }
-            reconcileSpecChange(cr, ctx, observeConfig, deployConfig);
+            reconcileSpecChange(cr, ctx, observeConfig, deployConfig, specDiff.getType());
         } else if (shouldRollBack(cr, observeConfig, flinkService)) {
             // Rollbacks are executed in two steps, we initiate it first then return
             if (initiateRollBack(status)) {
@@ -200,7 +210,11 @@ public abstract class AbstractFlinkResourceReconciler<
      * @throws Exception Error during spec upgrade.
      */
     protected abstract void reconcileSpecChange(
-            CR cr, Context<?> ctx, Configuration observeConfig, Configuration deployConfig)
+            CR cr,
+            Context<?> ctx,
+            Configuration observeConfig,
+            Configuration deployConfig,
+            DiffType diffType)
             throws Exception;
 
     /**
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index fea52829..4f850abe 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffType;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -78,13 +79,32 @@ public abstract class AbstractJobReconciler<
 
     @Override
     protected void reconcileSpecChange(
-            CR resource, Context<?> ctx, Configuration observeConfig, Configuration deployConfig)
+            CR resource,
+            Context<?> ctx,
+            Configuration observeConfig,
+            Configuration deployConfig,
+            DiffType diffType)
             throws Exception {
+
         STATUS status = resource.getStatus();
         var reconciliationStatus = status.getReconciliationStatus();
         SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
         SPEC currentDeploySpec = resource.getSpec();
 
+        if (diffType == DiffType.SCALE) {
+            boolean scaled =
+                    getFlinkService(resource, ctx)
+                            .scale(
+                                    resource.getMetadata(),
+                                    resource.getSpec().getJob(),
+                                    deployConfig);
+            if (scaled) {
+                LOG.info("Reactive scaling succeeded");
+                ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
+                return;
+            }
+        }
+
         JobState currentJobState = lastReconciledSpec.getJob().getState();
         JobState desiredJobState = currentDeploySpec.getJob().getState();
         if (currentJobState == JobState.RUNNING) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 2e2efcf6..1131a5e9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffType;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -92,7 +93,8 @@ public class SessionReconciler
             FlinkDeployment deployment,
             Context<?> ctx,
             Configuration observeConfig,
-            Configuration deployConfig)
+            Configuration deployConfig,
+            DiffType type)
             throws Exception {
         deleteSessionCluster(deployment, observeConfig);
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diff.java
similarity index 59%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diff.java
index d32ab0e2..9f68302f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diff.java
@@ -15,27 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.crd.spec;
+package org.apache.flink.kubernetes.operator.reconciler.diff;
 
 import org.apache.flink.annotation.Experimental;
 
-import io.fabric8.kubernetes.api.model.Pod;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.Value;
 
-/** JobManager spec. */
+/**
+ * Contains the differences between two {@link Diffable} class fields.
+ *
+ * <p>Inspired by:
+ * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/Diff.java
+ */
 @Experimental
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class JobManagerSpec {
-    /** Resource specification for the JobManager pods. */
-    private Resource resource;
-
-    /** Number of JobManager replicas. Must be 1 for non-HA deployments. */
-    private int replicas = 1;
-
-    /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */
-    private Pod podTemplate;
+@Value
+public class Diff<T> {
+    @NonNull String fieldName;
+    T left;
+    T right;
+    @NonNull DiffType type;
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffBuilder.java
new file mode 100644
index 00000000..3817cc93
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffBuilder.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.diff;
+
+import org.apache.flink.annotation.Experimental;
+
+import lombok.NonNull;
+import org.apache.commons.lang3.builder.Builder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Assists in implementing {@link Diffable#diff(Object)} methods.
+ *
+ * <p>Inspired by:
+ * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/DiffBuilder.java
+ */
+@Experimental
+public class DiffBuilder<T> implements Builder<DiffResult<?>> {
+
+    private static final String DELIMITER = ".";
+
+    private final T left;
+    private final T right;
+
+    private final List<Diff<?>> diffs;
+    private boolean triviallyEqual;
+
+    public DiffBuilder(@NonNull final T left, @NonNull final T right) {
+        this.diffs = new ArrayList<>();
+        this.left = left;
+        this.right = right;
+        this.triviallyEqual = left == right || left.equals(right);
+    }
+
+    public DiffBuilder<T> testTriviallyEqual(boolean testTriviallyEqual) {
+        this.triviallyEqual = this.triviallyEqual && testTriviallyEqual;
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName,
+            final boolean left,
+            final boolean right,
+            DiffType type) {
+        if (triviallyEqual) {
+            return this;
+        }
+        if (left != right) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName,
+            final boolean[] left,
+            final boolean[] right,
+            DiffType type) {
+        if (triviallyEqual) {
+            return this;
+        }
+        if (!Arrays.equals(left, right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final byte left, final byte right, DiffType type) {
+        if (triviallyEqual) {
+            return this;
+        }
+        if (left != right) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final byte[] left, final byte[] right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (!Arrays.equals(left, right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final char left, final char right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (left != right) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final char[] left, final char[] right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (!Arrays.equals(left, right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final double left, final double right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (Double.doubleToLongBits(left) != Double.doubleToLongBits(right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName,
+            final double[] left,
+            final double[] right,
+            DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (!Arrays.equals(left, right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final float left, final float right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (Float.floatToIntBits(left) != Float.floatToIntBits(right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName,
+            final float[] left,
+            final float[] right,
+            DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (!Arrays.equals(left, right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final int left, final int right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (left != right) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final int[] left, final int[] right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (!Arrays.equals(left, right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final long left, final long right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (left != right) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final long[] left, final long[] right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (!Arrays.equals(left, right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final short left, final short right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (left != right) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName,
+            final short[] left,
+            final short[] right,
+            DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (!Arrays.equals(left, right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, final Object left, final Object right, DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+        if (left == right) {
+            return this;
+        }
+
+        final Object objectToTest = Objects.requireNonNullElse(left, right);
+
+        if (objectToTest.getClass().isArray()) {
+            if (objectToTest instanceof boolean[]) {
+                return append(fieldName, (boolean[]) left, (boolean[]) right, type);
+            }
+            if (objectToTest instanceof byte[]) {
+                return append(fieldName, (byte[]) left, (byte[]) right, type);
+            }
+            if (objectToTest instanceof char[]) {
+                return append(fieldName, (char[]) left, (char[]) right, type);
+            }
+            if (objectToTest instanceof double[]) {
+                return append(fieldName, (double[]) left, (double[]) right, type);
+            }
+            if (objectToTest instanceof float[]) {
+                return append(fieldName, (float[]) left, (float[]) right, type);
+            }
+            if (objectToTest instanceof int[]) {
+                return append(fieldName, (int[]) left, (int[]) right, type);
+            }
+            if (objectToTest instanceof long[]) {
+                return append(fieldName, (long[]) left, (long[]) right, type);
+            }
+            if (objectToTest instanceof short[]) {
+                return append(fieldName, (short[]) left, (short[]) right, type);
+            }
+
+            return append(fieldName, (Object[]) left, (Object[]) right, type);
+        }
+
+        if (left != null && left.equals(right)) {
+            return this;
+        }
+
+        diffs.add(new Diff<>(fieldName, left, right, type));
+
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName,
+            final Object[] left,
+            final Object[] right,
+            DiffType type) {
+
+        if (triviallyEqual) {
+            return this;
+        }
+
+        if (!Arrays.equals(left, right)) {
+            diffs.add(new Diff<>(fieldName, left, right, type));
+        }
+
+        return this;
+    }
+
+    public DiffBuilder<T> append(
+            @NonNull final String fieldName, @NonNull final DiffResult<?> diffResult) {
+        if (triviallyEqual) {
+            return this;
+        }
+        diffResult
+                .getDiffList()
+                .forEach(
+                        diff ->
+                                append(
+                                        fieldName + DELIMITER + diff.getFieldName(),
+                                        diff.getLeft(),
+                                        diff.getRight(),
+                                        diff.getType()));
+        return this;
+    }
+
+    @Override
+    public DiffResult<T> build() {
+        return new DiffResult<>(left, right, diffs);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffResult.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffResult.java
new file mode 100644
index 00000000..8b97b515
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffResult.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.diff;
+
+import org.apache.flink.annotation.Experimental;
+
+import lombok.Getter;
+import lombok.NonNull;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import java.util.List;
+
+/**
+ * Contains a collection of the differences between two {@link Diffable} objects.
+ *
+ * <p>Inspired by:
+ * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/DiffResult.java
+ */
+@Experimental
+@Getter
+public class DiffResult<T> {
+    @NonNull private final List<Diff<?>> diffList;
+    @NonNull private final T left;
+    @NonNull private final T right;
+    @NonNull private final DiffType type;
+
+    DiffResult(@NonNull T left, @NonNull T right, @NonNull List<Diff<?>> diffList) {
+        this.left = left;
+        this.right = right;
+        this.diffList = diffList;
+        this.type = getSpechChangeType(diffList);
+    }
+
+    public int getNumDiffs() {
+        return diffList.size();
+    }
+
+    @Override
+    public String toString() {
+        if (diffList.isEmpty()) {
+            return "";
+        }
+
+        final ToStringBuilder lhsBuilder =
+                new ToStringBuilder(left, ToStringStyle.SHORT_PREFIX_STYLE);
+        final ToStringBuilder rhsBuilder =
+                new ToStringBuilder(right, ToStringStyle.SHORT_PREFIX_STYLE);
+
+        diffList.forEach(
+                diff -> {
+                    lhsBuilder.append(diff.getFieldName(), diff.getLeft());
+                    rhsBuilder.append(diff.getFieldName(), diff.getRight());
+                });
+
+        return String.format("%s differs from %s", lhsBuilder.build(), rhsBuilder.build());
+    }
+
+    private static DiffType getSpechChangeType(List<Diff<?>> diffs) {
+        var type = DiffType.IGNORE;
+        for (var diff : diffs) {
+            type = DiffType.max(type, diff.getType());
+            if (type == DiffType.UPGRADE) {
+                return type;
+            }
+        }
+        return type;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffType.java
similarity index 59%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffType.java
index d32ab0e2..ae930354 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffType.java
@@ -15,27 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.crd.spec;
+package org.apache.flink.kubernetes.operator.reconciler.diff;
 
 import org.apache.flink.annotation.Experimental;
 
-import io.fabric8.kubernetes.api.model.Pod;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/** JobManager spec. */
+/** Spec change type. */
 @Experimental
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class JobManagerSpec {
-    /** Resource specification for the JobManager pods. */
-    private Resource resource;
+public enum DiffType {
 
-    /** Number of JobManager replicas. Must be 1 for non-HA deployments. */
-    private int replicas = 1;
+    /** Ignorable spec change. */
+    IGNORE,
+    /** Scalable spec change. */
+    SCALE,
+    /** Upgradable spec change. */
+    UPGRADE;
 
-    /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */
-    private Pod podTemplate;
+    public static DiffType max(DiffType left, DiffType right) {
+        return (left.ordinal() >= right.ordinal()) ? left : right;
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diffable.java
similarity index 58%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diffable.java
index d32ab0e2..cd9919db 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diffable.java
@@ -15,27 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.crd.spec;
+package org.apache.flink.kubernetes.operator.reconciler.diff;
 
 import org.apache.flink.annotation.Experimental;
 
-import io.fabric8.kubernetes.api.model.Pod;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/** JobManager spec. */
+/**
+ * {@link Diffable} classes can be compared with other {@link Diffable} objects for differences.
+ *
+ * <p>Inspired by:
+ * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/Diffable.java
+ */
 @Experimental
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class JobManagerSpec {
-    /** Resource specification for the JobManager pods. */
-    private Resource resource;
-
-    /** Number of JobManager replicas. Must be 1 for non-HA deployments. */
-    private int replicas = 1;
-
-    /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */
-    private Pod podTemplate;
+@FunctionalInterface
+public interface Diffable<T> {
+    DiffResult<T> diff(T obj);
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
new file mode 100644
index 00000000..c9b137b0
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.diff;
+
+import org.apache.flink.annotation.Experimental;
+
+import lombok.NonNull;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.builder.Builder;
+import org.apache.commons.lang3.reflect.FieldUtils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.commons.lang3.reflect.FieldUtils.readField;
+import static org.apache.flink.kubernetes.operator.reconciler.diff.DiffType.UPGRADE;
+
+/**
+ * Assists in implementing {@link Diffable#diff(Object)} methods with reflection.
+ *
+ * <p>Inspired by:
+ * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/ReflectionDiffBuilder.java
+ */
+@Experimental
+public class ReflectiveDiffBuilder<T> implements Builder<DiffResult<T>> {
+
+    private final Object left;
+    private final Object right;
+    private final DiffBuilder<T> diffBuilder;
+
+    public ReflectiveDiffBuilder(@NonNull final T lhs, @NonNull final T rhs) {
+        this.left = lhs;
+        this.right = rhs;
+        diffBuilder = new DiffBuilder<>(lhs, rhs);
+    }
+
+    @Override
+    public DiffResult<T> build() {
+        if (left.equals(right)) {
+            return diffBuilder.build();
+        }
+
+        appendFields(left.getClass());
+        return diffBuilder.build();
+    }
+
+    private void appendFields(final Class<?> clazz) {
+        for (final Field field : FieldUtils.getAllFields(clazz)) {
+            if (accept(field)) {
+                try {
+                    var leftField = readField(field, left, true);
+                    var rightField = readField(field, right, true);
+                    if (field.isAnnotationPresent(SpecDiff.Config.class)
+                            && Map.class.isAssignableFrom(field.getType())) {
+                        diffBuilder.append(
+                                field.getName(),
+                                configDiff(
+                                        field,
+                                        (leftField != null)
+                                                ? (Map<String, String>) leftField
+                                                : new HashMap<>(),
+                                        (rightField != null)
+                                                ? (Map<String, String>) rightField
+                                                : new HashMap<>()));
+                    } else if (field.isAnnotationPresent(SpecDiff.class)) {
+                        var annotation = field.getAnnotation(SpecDiff.class);
+                        diffBuilder.append(
+                                field.getName(), leftField, rightField, annotation.value());
+                    } else if (Diffable.class.isAssignableFrom(field.getType())
+                            && ObjectUtils.allNotNull(leftField, rightField)) {
+
+                        diffBuilder.append(
+                                field.getName(), ((Diffable<T>) leftField).diff((T) rightField));
+                    } else {
+                        diffBuilder.append(
+                                field.getName(),
+                                readField(field, left, true),
+                                readField(field, right, true),
+                                UPGRADE);
+                    }
+
+                } catch (final IllegalAccessException ex) {
+                    throw new RuntimeException(
+                            "Unexpected IllegalAccessException: " + ex.getMessage());
+                }
+            }
+        }
+    }
+
+    private boolean accept(final Field field) {
+        if (field.getName().indexOf(ClassUtils.INNER_CLASS_SEPARATOR_CHAR) != -1) {
+            return false;
+        }
+        if (Modifier.isTransient(field.getModifiers())) {
+            return false;
+        }
+        return !Modifier.isStatic(field.getModifiers());
+    }
+
+    private static DiffResult<Map<String, String>> configDiff(
+            Field field, Map<String, String> left, Map<String, String> right) {
+        var keys = new HashSet<String>();
+        keys.addAll(left.keySet());
+        keys.addAll(right.keySet());
+        var diffBuilder = new DiffBuilder<>(left, right);
+        var annotation = field.getAnnotation(SpecDiff.Config.class);
+
+        keys.forEach(
+                key -> {
+                    if (annotation != null) {
+                        DiffType diffType = getType(annotation, key);
+                        diffBuilder.append(key, left.get(key), right.get(key), diffType);
+                    } else {
+                        diffBuilder.append(key, left.get(key), right.get(key), UPGRADE);
+                    }
+                });
+
+        return diffBuilder.build();
+    }
+
+    private static DiffType getType(SpecDiff.Config annotation, String key) {
+        DiffType diffType = UPGRADE;
+        for (var entry : annotation.value()) {
+            if (key.startsWith(entry.prefix())) {
+                return entry.type();
+            }
+        }
+        return diffType;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiff.java
similarity index 53%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiff.java
index d32ab0e2..647eec73 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiff.java
@@ -15,27 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.crd.spec;
+package org.apache.flink.kubernetes.operator.reconciler.diff;
 
 import org.apache.flink.annotation.Experimental;
 
-import io.fabric8.kubernetes.api.model.Pod;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
-/** JobManager spec. */
+/** Spec diff annotation. */
 @Experimental
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class JobManagerSpec {
-    /** Resource specification for the JobManager pods. */
-    private Resource resource;
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SpecDiff {
+    DiffType value() default DiffType.UPGRADE;
 
-    /** Number of JobManager replicas. Must be 1 for non-HA deployments. */
-    private int replicas = 1;
+    /** Spec diff config annotation. */
+    @Target(ElementType.FIELD)
+    @Retention(RetentionPolicy.RUNTIME)
+    @interface Config {
+        Entry[] value();
+    }
 
-    /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */
-    private Pod podTemplate;
+    /** Spec diff config annotation entry. */
+    @Target(ElementType.FIELD)
+    @Retention(RetentionPolicy.RUNTIME)
+    @interface Entry {
+        String prefix();
+
+        DiffType type();
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index d7aa1b98..560c5cc5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -93,4 +93,8 @@ public interface FlinkService {
     PodList getJmPodList(FlinkDeployment deployment, Configuration conf);
 
     void waitForClusterShutdown(Configuration conf);
+
+    default boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) {
+        return false;
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 08c2ef4e..8b46c91c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
@@ -33,6 +34,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
 import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
 import org.apache.flink.kubernetes.operator.standalone.KubernetesStandaloneClusterDescriptor;
+import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -180,4 +182,40 @@ public class StandaloneFlinkService extends AbstractFlinkService {
                     .delete();
         }
     }
+
+    @Override
+    public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) {
+        if (conf.get(JobManagerOptions.SCHEDULER_MODE) == null) {
+            LOG.info("Reactive scaling is not enabled");
+            return false;
+        }
+
+        var clusterId = meta.getName();
+        var namespace = meta.getNamespace();
+        var name = StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId);
+        var deployment =
+                kubernetesClient.apps().deployments().inNamespace(namespace).withName(name);
+
+        if (deployment == null || deployment.get() == null) {
+            LOG.warn("TM Deployment ({}) not found", name);
+            return false;
+        }
+
+        var actualReplicas = deployment.get().getSpec().getReplicas();
+        var desiredReplicas =
+                conf.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
+        if (actualReplicas != desiredReplicas) {
+            LOG.info(
+                    "Scaling TM replicas: actual({}) -> desired({})",
+                    actualReplicas,
+                    desiredReplicas);
+            deployment.scale(desiredReplicas);
+        } else {
+            LOG.info(
+                    "Not scaling TM replicas: actual({}) == desired({})",
+                    actualReplicas,
+                    desiredReplicas);
+        }
+        return true;
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index b79a57f0..3d87fda1 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -163,6 +164,9 @@ public class TestUtils {
                         .withUid(UUID.randomUUID().toString())
                         .withGeneration(1L)
                         .build());
+
+        Map<String, String> conf = new HashMap<>();
+        conf.put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "header");
         sessionJob.setSpec(
                 FlinkSessionJobSpec.builder()
                         .deploymentName(TEST_DEPLOYMENT_NAME)
@@ -173,6 +177,7 @@ public class TestUtils {
                                         .upgradeMode(UpgradeMode.STATELESS)
                                         .state(JobState.RUNNING)
                                         .build())
+                        .flinkConfiguration(conf)
                         .build());
         return sessionJob;
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index ba929289..f25118e1 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
@@ -40,6 +41,7 @@ import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
+import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -98,6 +100,7 @@ public class TestingFlinkService extends AbstractFlinkService {
     private Consumer<Configuration> listJobConsumer = conf -> {};
     private List<String> disposedSavepoints = new ArrayList<>();
     private Map<String, Boolean> savepointTriggers = new HashMap<>();
+    private int desiredReplicas = 0;
 
     public TestingFlinkService() {
         super(null, new FlinkConfigManager(new Configuration()));
@@ -476,4 +479,18 @@ public class TestingFlinkService extends AbstractFlinkService {
     public Map<String, String> getClusterInfo(Configuration conf) {
         return CLUSTER_INFO;
     }
+
+    @Override
+    public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) {
+        if (conf.get(JobManagerOptions.SCHEDULER_MODE) == null) {
+            return false;
+        }
+        desiredReplicas =
+                conf.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
+        return true;
+    }
+
+    public int getDesiredReplicas() {
+        return desiredReplicas;
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index ac4272b1..73a707b0 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -19,7 +19,10 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
@@ -29,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
@@ -79,7 +83,7 @@ public class ApplicationReconcilerTest {
         kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
         var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
         var statusRecoder = new TestingStatusRecorder<FlinkDeployment, FlinkDeploymentStatus>();
-        flinkService = new TestingFlinkService();
+        flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         reconciler =
                 new ApplicationReconciler(
@@ -466,6 +470,7 @@ public class ApplicationReconcilerTest {
                 .put(
                         KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT.key(),
                         "true");
+        spDeployment.getSpec().setImage("flink:greatest");
         reconciler.reconcile(spDeployment, context);
         assertEquals(
                 "trigger_0",
@@ -530,4 +535,77 @@ public class ApplicationReconcilerTest {
                 UpgradeMode.SAVEPOINT,
                 reconStatus.deserializeLastReconciledSpec().getJob().getUpgradeMode());
     }
+
+    @Test
+    public void testScaleWithReactiveModeDisabled() throws Exception {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+
+        reconciler.reconcile(deployment, context);
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+        deployment.getSpec().getJob().setParallelism(100);
+        reconciler.reconcile(deployment, context);
+        assertEquals(
+                JobState.SUSPENDED,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getState());
+    }
+
+    @Test
+    public void testScaleWithReactiveModeEnabled() throws Exception {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(
+                        JobManagerOptions.SCHEDULER_MODE.key(),
+                        SchedulerExecutionMode.REACTIVE.name());
+
+        reconciler.reconcile(deployment, context);
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+        // the default.parallelism is always ignored
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
+        reconciler.reconcile(deployment, context);
+        assertEquals(
+                JobState.RUNNING,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getState());
+        assertEquals(0, flinkService.getDesiredReplicas());
+
+        deployment.getSpec().getJob().setParallelism(4);
+        reconciler.reconcile(deployment, context);
+        assertEquals(
+                JobState.RUNNING,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getState());
+        assertEquals(2, flinkService.getDesiredReplicas());
+
+        deployment.getSpec().getJob().setParallelism(8);
+        reconciler.reconcile(deployment, context);
+        assertEquals(
+                JobState.RUNNING,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getState());
+        assertEquals(4, flinkService.getDesiredReplicas());
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
new file mode 100644
index 00000000..6f39669a
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.diff;
+
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_INTERVAL;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Spec diff test. */
+public class SpecDiffTest {
+
+    @Test
+    public void testFlinkDeploymentSpecChanges() {
+        var left = new FlinkDeploymentSpec();
+        var right = ReconciliationUtils.clone(left);
+        var diff = left.diff(right);
+        assertEquals(DiffType.IGNORE, diff.getType());
+        assertEquals(0, diff.getNumDiffs());
+
+        left = TestUtils.buildSessionCluster().getSpec();
+        right = ReconciliationUtils.clone(left);
+        diff = left.diff(right);
+        assertEquals(DiffType.IGNORE, diff.getType());
+        assertEquals(0, diff.getNumDiffs());
+
+        left = TestUtils.buildApplicationCluster().getSpec();
+        left.setPodTemplate(TestUtils.getTestPod("localhost", "v1", List.of()));
+        left.setIngress(IngressSpec.builder().template("template").build());
+
+        right = ReconciliationUtils.clone(left);
+        diff = left.diff(right);
+        assertEquals(DiffType.IGNORE, diff.getType());
+        assertEquals(0, diff.getNumDiffs());
+
+        assertEquals(0, diff.getNumDiffs());
+        right.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        right.getJob().setAllowNonRestoredState(true);
+        right.getJob().setInitialSavepointPath("local:///tmp");
+        right.getJob().setSavepointTriggerNonce(123L);
+        right.getFlinkConfiguration().put(OPERATOR_RECONCILE_INTERVAL.key(), "100 SECONDS");
+        right.getFlinkConfiguration().put(SCOPE_NAMING_KUBERNETES_OPERATOR.key(), "foo.bar");
+        right.getFlinkConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
+
+        diff = left.diff(right);
+        assertEquals(DiffType.IGNORE, diff.getType());
+        assertEquals(7, diff.getNumDiffs());
+
+        right.getFlinkConfiguration().remove(SCOPE_NAMING_KUBERNETES_OPERATOR.key());
+
+        diff = left.diff(right);
+        assertEquals(DiffType.IGNORE, diff.getType());
+        assertEquals(6, diff.getNumDiffs());
+
+        right.getJob().setParallelism(100);
+
+        diff = left.diff(right);
+        assertEquals(DiffType.SCALE, diff.getType());
+        assertEquals(7, diff.getNumDiffs());
+
+        right.setImage("flink:greatest");
+        right.setImagePullPolicy("never:pull");
+        right.setServiceAccount("anonymous");
+        right.setFlinkVersion(FlinkVersion.v1_13);
+        right.setMode(KubernetesDeploymentMode.STANDALONE);
+        right.setLogConfiguration(Map.of("foo", "bar"));
+
+        diff = left.diff(right);
+        assertEquals(DiffType.UPGRADE, diff.getType());
+        assertEquals(13, diff.getNumDiffs());
+
+        right.getJobManager().getResource().setMemory("999m");
+        right.getTaskManager().setReplicas(999);
+        right.getPodTemplate().setApiVersion("v2");
+        right.getIngress().setTemplate("none");
+
+        diff = left.diff(right);
+        assertEquals(DiffType.UPGRADE, diff.getType());
+        assertEquals(17, diff.getNumDiffs());
+
+        right.getJob().setJarURI("missing.jar");
+        right.getJob().setEntryClass("missing.Class");
+        right.getJob().setArgs(new String[] {"foo", "bar"});
+        right.getJob().setState(JobState.SUSPENDED);
+
+        diff = left.diff(right);
+        assertEquals(DiffType.UPGRADE, diff.getType());
+        assertEquals(21, diff.getNumDiffs());
+
+        right.getFlinkConfiguration().put(CoreOptions.FLINK_TM_JVM_OPTIONS.key(), "-Dfoo=bar");
+
+        diff = left.diff(right);
+        assertEquals(DiffType.UPGRADE, diff.getType());
+        assertEquals(22, diff.getNumDiffs());
+    }
+
+    @Test
+    public void testFlinkSessionJobSpecChanges() {
+        var left = new FlinkSessionJobSpec();
+        var right = ReconciliationUtils.clone(left);
+        var diff = left.diff(right);
+        assertEquals(DiffType.IGNORE, diff.getType());
+        assertEquals(0, diff.getNumDiffs());
+
+        left = TestUtils.buildSessionJob().getSpec();
+        right = ReconciliationUtils.clone(left);
+        diff = left.diff(right);
+        assertEquals(DiffType.IGNORE, diff.getType());
+        assertEquals(0, diff.getNumDiffs());
+
+        assertEquals(0, diff.getNumDiffs());
+        right.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        right.getJob().setAllowNonRestoredState(true);
+        right.getJob().setInitialSavepointPath("local:///tmp");
+        right.getJob().setSavepointTriggerNonce(123L);
+        right.getFlinkConfiguration()
+                .put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "changed");
+
+        diff = left.diff(right);
+        assertEquals(DiffType.IGNORE, diff.getType());
+        assertEquals(5, diff.getNumDiffs());
+
+        right.getJob().setParallelism(100);
+
+        diff = left.diff(right);
+        assertEquals(DiffType.SCALE, diff.getType());
+        assertEquals(6, diff.getNumDiffs());
+
+        right.setDeploymentName("missing");
+        right.getJob().setJarURI("missing.jar");
+        right.getJob().setEntryClass("missing.Class");
+        right.getJob().setArgs(new String[] {"foo", "bar"});
+        right.getJob().setState(JobState.SUSPENDED);
+
+        diff = left.diff(right);
+        assertEquals(DiffType.UPGRADE, diff.getType());
+        assertEquals(11, diff.getNumDiffs());
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index ec09f546..defe48a3 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -18,15 +18,20 @@
 package org.apache.flink.kubernetes.operator.service;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -36,6 +41,8 @@ import org.junit.jupiter.api.Test;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link StandaloneFlinkService unit tests */
 @EnableKubernetesMockClient(crud = true)
@@ -61,7 +68,7 @@ public class StandaloneFlinkServiceTest {
         FlinkDeployment flinkDeployment = TestUtils.buildSessionCluster();
         configuration = buildConfig(flinkDeployment, configuration);
 
-        createDeployments();
+        createDeployments(flinkDeployment);
 
         List<Deployment> deployments = kubernetesClient.apps().deployments().list().getItems();
 
@@ -80,7 +87,7 @@ public class StandaloneFlinkServiceTest {
         FlinkDeployment flinkDeployment = TestUtils.buildSessionCluster();
         configuration = buildConfig(flinkDeployment, configuration);
 
-        createDeployments();
+        createDeployments(flinkDeployment);
 
         List<Deployment> deployments = kubernetesClient.apps().deployments().list().getItems();
         assertEquals(2, deployments.size());
@@ -93,6 +100,83 @@ public class StandaloneFlinkServiceTest {
         assertEquals(0, deployments.size());
     }
 
+    @Test
+    public void testReactiveScale() throws Exception {
+        var flinkDeployment = TestUtils.buildApplicationCluster();
+        var clusterId = flinkDeployment.getMetadata().getName();
+        var namespace = flinkDeployment.getMetadata().getNamespace();
+        flinkDeployment.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+        flinkDeployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(
+                        JobManagerOptions.SCHEDULER_MODE.key(),
+                        SchedulerExecutionMode.REACTIVE.name());
+        createDeployments(flinkDeployment);
+        assertTrue(
+                flinkStandaloneService.scale(
+                        flinkDeployment.getMetadata(),
+                        flinkDeployment.getSpec().getJob(),
+                        buildConfig(flinkDeployment, configuration)));
+
+        assertEquals(
+                1,
+                kubernetesClient
+                        .apps()
+                        .deployments()
+                        .inNamespace(namespace)
+                        .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+                        .get()
+                        .getSpec()
+                        .getReplicas());
+
+        flinkDeployment.getSpec().getJob().setParallelism(4);
+        assertTrue(
+                flinkStandaloneService.scale(
+                        flinkDeployment.getMetadata(),
+                        flinkDeployment.getSpec().getJob(),
+                        buildConfig(flinkDeployment, configuration)));
+        assertEquals(
+                2,
+                kubernetesClient
+                        .apps()
+                        .deployments()
+                        .inNamespace(namespace)
+                        .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+                        .get()
+                        .getSpec()
+                        .getReplicas());
+
+        kubernetesClient
+                .apps()
+                .deployments()
+                .inNamespace(namespace)
+                .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+                .delete();
+        assertFalse(
+                flinkStandaloneService.scale(
+                        flinkDeployment.getMetadata(),
+                        flinkDeployment.getSpec().getJob(),
+                        buildConfig(flinkDeployment, configuration)));
+
+        createDeployments(flinkDeployment);
+        assertTrue(
+                flinkStandaloneService.scale(
+                        flinkDeployment.getMetadata(),
+                        flinkDeployment.getSpec().getJob(),
+                        buildConfig(flinkDeployment, configuration)));
+
+        flinkDeployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .remove(JobManagerOptions.SCHEDULER_MODE.key());
+        assertFalse(
+                flinkStandaloneService.scale(
+                        flinkDeployment.getMetadata(),
+                        flinkDeployment.getSpec().getJob(),
+                        buildConfig(flinkDeployment, configuration)));
+    }
+
     private Configuration buildConfig(FlinkDeployment flinkDeployment, Configuration configuration)
             throws Exception {
         return FlinkConfigBuilder.buildFrom(
@@ -102,29 +186,28 @@ public class StandaloneFlinkServiceTest {
                 configuration);
     }
 
-    private void createDeployments() {
+    private void createDeployments(AbstractFlinkResource cr) {
         Deployment jmDeployment = new Deployment();
         ObjectMeta jmMetadata = new ObjectMeta();
         jmMetadata.setName(
-                StandaloneKubernetesUtils.getJobManagerDeploymentName(
-                        TestUtils.TEST_DEPLOYMENT_NAME));
+                StandaloneKubernetesUtils.getJobManagerDeploymentName(cr.getMetadata().getName()));
         jmDeployment.setMetadata(jmMetadata);
         kubernetesClient
                 .apps()
                 .deployments()
-                .inNamespace(TestUtils.TEST_NAMESPACE)
-                .create(jmDeployment);
+                .inNamespace(cr.getMetadata().getNamespace())
+                .createOrReplace(jmDeployment);
 
         Deployment tmDeployment = new Deployment();
         ObjectMeta tmMetadata = new ObjectMeta();
         tmMetadata.setName(
-                StandaloneKubernetesUtils.getTaskManagerDeploymentName(
-                        TestUtils.TEST_DEPLOYMENT_NAME));
+                StandaloneKubernetesUtils.getTaskManagerDeploymentName(cr.getMetadata().getName()));
         tmDeployment.setMetadata(tmMetadata);
+        tmDeployment.setSpec(new DeploymentSpec());
         kubernetesClient
                 .apps()
                 .deployments()
-                .inNamespace(TestUtils.TEST_NAMESPACE)
-                .create(tmDeployment);
+                .inNamespace(cr.getMetadata().getNamespace())
+                .createOrReplace(tmDeployment);
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index dc2ac584..418bcb33 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -59,7 +59,7 @@ public class FlinkUtilsTest {
         Container container2 = new Container();
         container2.setName("container2");
 
-        Pod pod1 = TestUtils.getTestPod("pod1 hostname", "pod1 api version", List.of(container2));
+        Pod pod1 = TestUtils.getTestPod("pod1 hostname", "pod1 api version", List.of());
 
         Pod pod2 =
                 TestUtils.getTestPod(