You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/26 09:01:15 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28646] Handle scaling operation separately in reconciler/service
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 0a346ec7 [FLINK-28646] Handle scaling operation separately in reconciler/service
0a346ec7 is described below
commit 0a346ec7c6e8b770c1c10aa01686ee68c9dd4f27
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Thu Jul 28 18:06:51 2022 +0200
[FLINK-28646] Handle scaling operation separately in reconciler/service
---
examples/basic-reactive.yaml | 62 ++++
.../operator/crd/spec/AbstractFlinkSpec.java | 17 +-
.../operator/crd/spec/JobManagerSpec.java | 2 +
.../kubernetes/operator/crd/spec/JobSpec.java | 26 +-
.../operator/crd/spec/TaskManagerSpec.java | 2 +
.../operator/reconciler/ReconciliationUtils.java | 2 +-
.../AbstractFlinkResourceReconciler.java | 30 +-
.../deployment/AbstractJobReconciler.java | 22 +-
.../reconciler/deployment/SessionReconciler.java | 4 +-
.../diff/Diff.java} | 33 +-
.../operator/reconciler/diff/DiffBuilder.java | 351 +++++++++++++++++++++
.../operator/reconciler/diff/DiffResult.java | 84 +++++
.../diff/DiffType.java} | 29 +-
.../diff/Diffable.java} | 29 +-
.../reconciler/diff/ReflectiveDiffBuilder.java | 149 +++++++++
.../diff/SpecDiff.java} | 40 ++-
.../kubernetes/operator/service/FlinkService.java | 4 +
.../operator/service/StandaloneFlinkService.java | 38 +++
.../flink/kubernetes/operator/TestUtils.java | 5 +
.../kubernetes/operator/TestingFlinkService.java | 17 +
.../deployment/ApplicationReconcilerTest.java | 80 ++++-
.../operator/reconciler/diff/SpecDiffTest.java | 170 ++++++++++
.../service/StandaloneFlinkServiceTest.java | 105 +++++-
.../kubernetes/operator/utils/FlinkUtilsTest.java | 2 +-
24 files changed, 1203 insertions(+), 100 deletions(-)
diff --git a/examples/basic-reactive.yaml b/examples/basic-reactive.yaml
new file mode 100644
index 00000000..eddf58b3
--- /dev/null
+++ b/examples/basic-reactive.yaml
@@ -0,0 +1,62 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+ name: basic-reactive-example
+spec:
+ image: flink:1.15
+ flinkVersion: v1_15
+ flinkConfiguration:
+ scheduler-mode: REACTIVE
+ taskmanager.numberOfTaskSlots: "2"
+ state.savepoints.dir: file:///flink-data/savepoints
+ state.checkpoints.dir: file:///flink-data/checkpoints
+ high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.storageDir: file:///flink-data/ha
+ serviceAccount: flink
+ jobManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ taskManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ podTemplate:
+ spec:
+ containers:
+ - name: flink-main-container
+ volumeMounts:
+ - mountPath: /flink-data
+ name: flink-volume
+ volumes:
+ - name: flink-volume
+ hostPath:
+ # directory location on host
+ path: /tmp/flink
+ # this field is optional
+ type: Directory
+ job:
+ jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ parallelism: 2
+ upgradeMode: savepoint
+ state: running
+ savepointTriggerNonce: 0
+ mode: standalone
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java
index 8407d904..d7d1e2a5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java
@@ -18,6 +18,11 @@
package org.apache.flink.kubernetes.operator.crd.spec;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffType;
+import org.apache.flink.kubernetes.operator.reconciler.diff.Diffable;
+import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
+import org.apache.flink.kubernetes.operator.reconciler.diff.SpecDiff;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -32,7 +37,7 @@ import java.util.Map;
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
-public abstract class AbstractFlinkSpec {
+public abstract class AbstractFlinkSpec implements Diffable<AbstractFlinkSpec> {
/** Job specification for application deployments/session job. Null for session clusters. */
private JobSpec job;
@@ -44,5 +49,15 @@ public abstract class AbstractFlinkSpec {
private Long restartNonce;
/** Flink configuration overrides for the Flink deployment or Flink session job. */
+ @SpecDiff.Config({
+ @SpecDiff.Entry(prefix = "parallelism.default", type = DiffType.IGNORE),
+ @SpecDiff.Entry(prefix = "kubernetes.operator", type = DiffType.IGNORE),
+ @SpecDiff.Entry(prefix = "metrics.scope.k8soperator", type = DiffType.IGNORE)
+ })
private Map<String, String> flinkConfiguration;
+
+ @Override
+ public DiffResult<AbstractFlinkSpec> diff(AbstractFlinkSpec spec) {
+ return new ReflectiveDiffBuilder<>(this, spec).build();
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
index d32ab0e2..634e7016 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Experimental;
import io.fabric8.kubernetes.api.model.Pod;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -29,6 +30,7 @@ import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
+@Builder
public class JobManagerSpec {
/** Resource specification for the JobManager pods. */
private Resource resource;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
index a6574994..3583a5e3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
@@ -18,6 +18,11 @@
package org.apache.flink.kubernetes.operator.crd.spec;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffType;
+import org.apache.flink.kubernetes.operator.reconciler.diff.Diffable;
+import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
+import org.apache.flink.kubernetes.operator.reconciler.diff.SpecDiff;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -31,7 +36,8 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Builder
-public class JobSpec {
+@EqualsAndHashCode
+public class JobSpec implements Diffable<JobSpec> {
/**
* URI of the job jar within the Flink docker container. For example: Example:
@@ -40,6 +46,7 @@ public class JobSpec {
private String jarURI;
/** Parallelism of the Flink job. */
+ @SpecDiff(DiffType.SCALE)
private int parallelism;
/** Fully qualified main class name of the Flink job. */
@@ -55,17 +62,26 @@ public class JobSpec {
* Nonce used to manually trigger savepoint for the running job. In order to trigger a
* savepoint, change the number to anything other than the current value.
*/
- @EqualsAndHashCode.Exclude private Long savepointTriggerNonce;
+ @SpecDiff(DiffType.IGNORE)
+ private Long savepointTriggerNonce;
/**
* Savepoint path used by the job the first time it is deployed. Upgrades/redeployments will not
* be affected.
*/
- @EqualsAndHashCode.Exclude private String initialSavepointPath;
+ @SpecDiff(DiffType.IGNORE)
+ private String initialSavepointPath;
/** Upgrade mode of the Flink job. */
- @EqualsAndHashCode.Exclude private UpgradeMode upgradeMode = UpgradeMode.STATELESS;
+ @SpecDiff(DiffType.IGNORE)
+ private UpgradeMode upgradeMode = UpgradeMode.STATELESS;
/** Allow checkpoint state that cannot be mapped to any job vertex in tasks. */
- @EqualsAndHashCode.Exclude private Boolean allowNonRestoredState;
+ @SpecDiff(DiffType.IGNORE)
+ private Boolean allowNonRestoredState;
+
+ @Override
+ public DiffResult<JobSpec> diff(JobSpec spec) {
+ return new ReflectiveDiffBuilder<>(this, spec).build();
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
index 400a2cc6..860ecd91 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.model.annotation.SpecReplicas;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -30,6 +31,7 @@ import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
+@Builder
public class TaskManagerSpec {
/** Resource specification for the TaskManager pods. */
private Resource resource;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index dc6bd6c2..389e59d6 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -265,8 +265,8 @@ public class ReconciliationUtils {
*
* @param specWithMetaString JSON string.
* @param specClass Spec class for deserialization.
- * @return Tuple2 of spec and meta.
* @param <T> Spec type.
+ * @return Tuple2 of spec and meta.
*/
public static <T extends AbstractFlinkSpec>
Tuple2<T, ReconciliationMetadata> deserializeSpecWithMeta(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index c51ba32c..436f96ec 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffType;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -67,7 +68,8 @@ public abstract class AbstractFlinkResourceReconciler<
protected final KubernetesClient kubernetesClient;
public static final String MSG_SUSPENDED = "Suspending existing deployment.";
- public static final String MSG_SPEC_CHANGED = "Detected spec change, starting reconciliation.";
+ public static final String MSG_SPEC_CHANGED =
+ "%s change(s) detected (%s), starting reconciliation.";
public static final String MSG_ROLLBACK = "Rolling back failed deployment.";
public static final String MSG_SUBMIT = "Starting deployment";
@@ -122,25 +124,33 @@ public abstract class AbstractFlinkResourceReconciler<
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
SPEC currentDeploySpec = cr.getSpec();
+ var specDiff = currentDeploySpec.diff(lastReconciledSpec);
+
+ var flinkService = getFlinkService(cr, ctx);
+
boolean specChanged =
- reconciliationStatus.getState() == ReconciliationState.UPGRADING
- || !currentDeploySpec.equals(lastReconciledSpec);
+ DiffType.IGNORE != specDiff.getType()
+ || reconciliationStatus.getState() == ReconciliationState.UPGRADING;
+
var observeConfig = getObserveConfig(cr, ctx);
- var flinkService = getFlinkService(cr, ctx);
+
if (specChanged) {
+
if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
return;
}
- LOG.info(MSG_SPEC_CHANGED);
+
+ var specChangeMessage = String.format(MSG_SPEC_CHANGED, specDiff.getType(), specDiff);
+ LOG.info(specChangeMessage);
if (reconciliationStatus.getState() != ReconciliationState.UPGRADING) {
eventRecorder.triggerEvent(
cr,
EventRecorder.Type.Normal,
EventRecorder.Reason.SpecChanged,
EventRecorder.Component.JobManagerDeployment,
- MSG_SPEC_CHANGED);
+ specChangeMessage);
}
- reconcileSpecChange(cr, ctx, observeConfig, deployConfig);
+ reconcileSpecChange(cr, ctx, observeConfig, deployConfig, specDiff.getType());
} else if (shouldRollBack(cr, observeConfig, flinkService)) {
// Rollbacks are executed in two steps, we initiate it first then return
if (initiateRollBack(status)) {
@@ -200,7 +210,11 @@ public abstract class AbstractFlinkResourceReconciler<
* @throws Exception Error during spec upgrade.
*/
protected abstract void reconcileSpecChange(
- CR cr, Context<?> ctx, Configuration observeConfig, Configuration deployConfig)
+ CR cr,
+ Context<?> ctx,
+ Configuration observeConfig,
+ Configuration deployConfig,
+ DiffType diffType)
throws Exception;
/**
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index fea52829..4f850abe 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffType;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -78,13 +79,32 @@ public abstract class AbstractJobReconciler<
@Override
protected void reconcileSpecChange(
- CR resource, Context<?> ctx, Configuration observeConfig, Configuration deployConfig)
+ CR resource,
+ Context<?> ctx,
+ Configuration observeConfig,
+ Configuration deployConfig,
+ DiffType diffType)
throws Exception {
+
STATUS status = resource.getStatus();
var reconciliationStatus = status.getReconciliationStatus();
SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
SPEC currentDeploySpec = resource.getSpec();
+ if (diffType == DiffType.SCALE) {
+ boolean scaled =
+ getFlinkService(resource, ctx)
+ .scale(
+ resource.getMetadata(),
+ resource.getSpec().getJob(),
+ deployConfig);
+ if (scaled) {
+ LOG.info("Reactive scaling succeeded");
+ ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
+ return;
+ }
+ }
+
JobState currentJobState = lastReconciledSpec.getJob().getState();
JobState desiredJobState = currentDeploySpec.getJob().getState();
if (currentJobState == JobState.RUNNING) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 2e2efcf6..1131a5e9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.diff.DiffType;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -92,7 +93,8 @@ public class SessionReconciler
FlinkDeployment deployment,
Context<?> ctx,
Configuration observeConfig,
- Configuration deployConfig)
+ Configuration deployConfig,
+ DiffType type)
throws Exception {
deleteSessionCluster(deployment, observeConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diff.java
similarity index 59%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diff.java
index d32ab0e2..9f68302f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diff.java
@@ -15,27 +15,24 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.crd.spec;
+package org.apache.flink.kubernetes.operator.reconciler.diff;
import org.apache.flink.annotation.Experimental;
-import io.fabric8.kubernetes.api.model.Pod;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.Value;
-/** JobManager spec. */
+/**
+ * Contains the differences between two {@link Diffable} class fields.
+ *
+ * <p>Inspired by:
+ * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/Diff.java
+ */
@Experimental
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class JobManagerSpec {
- /** Resource specification for the JobManager pods. */
- private Resource resource;
-
- /** Number of JobManager replicas. Must be 1 for non-HA deployments. */
- private int replicas = 1;
-
- /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */
- private Pod podTemplate;
+@Value
+public class Diff<T> {
+ @NonNull String fieldName;
+ T left;
+ T right;
+ @NonNull DiffType type;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffBuilder.java
new file mode 100644
index 00000000..3817cc93
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffBuilder.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.diff;
+
+import org.apache.flink.annotation.Experimental;
+
+import lombok.NonNull;
+import org.apache.commons.lang3.builder.Builder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Assists in implementing {@link Diffable#diff(Object)} methods.
+ *
+ * <p>Inspired by:
+ * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/DiffBuilder.java
+ */
+@Experimental
+public class DiffBuilder<T> implements Builder<DiffResult<?>> {
+
+ private static final String DELIMITER = ".";
+
+ private final T left;
+ private final T right;
+
+ private final List<Diff<?>> diffs;
+ private boolean triviallyEqual;
+
+ public DiffBuilder(@NonNull final T left, @NonNull final T right) {
+ this.diffs = new ArrayList<>();
+ this.left = left;
+ this.right = right;
+ this.triviallyEqual = left == right || left.equals(right);
+ }
+
+ public DiffBuilder<T> testTriviallyEqual(boolean testTriviallyEqual) {
+ this.triviallyEqual = this.triviallyEqual && testTriviallyEqual;
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName,
+ final boolean left,
+ final boolean right,
+ DiffType type) {
+ if (triviallyEqual) {
+ return this;
+ }
+ if (left != right) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName,
+ final boolean[] left,
+ final boolean[] right,
+ DiffType type) {
+ if (triviallyEqual) {
+ return this;
+ }
+ if (!Arrays.equals(left, right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final byte left, final byte right, DiffType type) {
+ if (triviallyEqual) {
+ return this;
+ }
+ if (left != right) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final byte[] left, final byte[] right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (!Arrays.equals(left, right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final char left, final char right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (left != right) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final char[] left, final char[] right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (!Arrays.equals(left, right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final double left, final double right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (Double.doubleToLongBits(left) != Double.doubleToLongBits(right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName,
+ final double[] left,
+ final double[] right,
+ DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (!Arrays.equals(left, right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final float left, final float right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (Float.floatToIntBits(left) != Float.floatToIntBits(right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName,
+ final float[] left,
+ final float[] right,
+ DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (!Arrays.equals(left, right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final int left, final int right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (left != right) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final int[] left, final int[] right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (!Arrays.equals(left, right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final long left, final long right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (left != right) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final long[] left, final long[] right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (!Arrays.equals(left, right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final short left, final short right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (left != right) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName,
+ final short[] left,
+ final short[] right,
+ DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (!Arrays.equals(left, right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, final Object left, final Object right, DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+ if (left == right) {
+ return this;
+ }
+
+ final Object objectToTest = Objects.requireNonNullElse(left, right);
+
+ if (objectToTest.getClass().isArray()) {
+ if (objectToTest instanceof boolean[]) {
+ return append(fieldName, (boolean[]) left, (boolean[]) right, type);
+ }
+ if (objectToTest instanceof byte[]) {
+ return append(fieldName, (byte[]) left, (byte[]) right, type);
+ }
+ if (objectToTest instanceof char[]) {
+ return append(fieldName, (char[]) left, (char[]) right, type);
+ }
+ if (objectToTest instanceof double[]) {
+ return append(fieldName, (double[]) left, (double[]) right, type);
+ }
+ if (objectToTest instanceof float[]) {
+ return append(fieldName, (float[]) left, (float[]) right, type);
+ }
+ if (objectToTest instanceof int[]) {
+ return append(fieldName, (int[]) left, (int[]) right, type);
+ }
+ if (objectToTest instanceof long[]) {
+ return append(fieldName, (long[]) left, (long[]) right, type);
+ }
+ if (objectToTest instanceof short[]) {
+ return append(fieldName, (short[]) left, (short[]) right, type);
+ }
+
+ return append(fieldName, (Object[]) left, (Object[]) right, type);
+ }
+
+ if (left != null && left.equals(right)) {
+ return this;
+ }
+
+ diffs.add(new Diff<>(fieldName, left, right, type));
+
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName,
+ final Object[] left,
+ final Object[] right,
+ DiffType type) {
+
+ if (triviallyEqual) {
+ return this;
+ }
+
+ if (!Arrays.equals(left, right)) {
+ diffs.add(new Diff<>(fieldName, left, right, type));
+ }
+
+ return this;
+ }
+
+ public DiffBuilder<T> append(
+ @NonNull final String fieldName, @NonNull final DiffResult<?> diffResult) {
+ if (triviallyEqual) {
+ return this;
+ }
+ diffResult
+ .getDiffList()
+ .forEach(
+ diff ->
+ append(
+ fieldName + DELIMITER + diff.getFieldName(),
+ diff.getLeft(),
+ diff.getRight(),
+ diff.getType()));
+ return this;
+ }
+
+ @Override
+ public DiffResult<T> build() {
+ return new DiffResult<>(left, right, diffs);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffResult.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffResult.java
new file mode 100644
index 00000000..8b97b515
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffResult.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.diff;
+
+import org.apache.flink.annotation.Experimental;
+
+import lombok.Getter;
+import lombok.NonNull;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import java.util.List;
+
+/**
+ * Contains a collection of the differences between two {@link Diffable} objects.
+ *
+ * <p>Inspired by:
+ * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/DiffResult.java
+ */
+@Experimental
+@Getter
+public class DiffResult<T> {
+ @NonNull private final List<Diff<?>> diffList;
+ @NonNull private final T left;
+ @NonNull private final T right;
+ @NonNull private final DiffType type;
+
+ DiffResult(@NonNull T left, @NonNull T right, @NonNull List<Diff<?>> diffList) {
+ this.left = left;
+ this.right = right;
+ this.diffList = diffList;
+ this.type = getSpechChangeType(diffList);
+ }
+
+ public int getNumDiffs() {
+ return diffList.size();
+ }
+
+ @Override
+ public String toString() {
+ if (diffList.isEmpty()) {
+ return "";
+ }
+
+ final ToStringBuilder lhsBuilder =
+ new ToStringBuilder(left, ToStringStyle.SHORT_PREFIX_STYLE);
+ final ToStringBuilder rhsBuilder =
+ new ToStringBuilder(right, ToStringStyle.SHORT_PREFIX_STYLE);
+
+ diffList.forEach(
+ diff -> {
+ lhsBuilder.append(diff.getFieldName(), diff.getLeft());
+ rhsBuilder.append(diff.getFieldName(), diff.getRight());
+ });
+
+ return String.format("%s differs from %s", lhsBuilder.build(), rhsBuilder.build());
+ }
+
+ private static DiffType getSpechChangeType(List<Diff<?>> diffs) {
+ var type = DiffType.IGNORE;
+ for (var diff : diffs) {
+ type = DiffType.max(type, diff.getType());
+ if (type == DiffType.UPGRADE) {
+ return type;
+ }
+ }
+ return type;
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffType.java
similarity index 59%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffType.java
index d32ab0e2..ae930354 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/DiffType.java
@@ -15,27 +15,22 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.crd.spec;
+package org.apache.flink.kubernetes.operator.reconciler.diff;
import org.apache.flink.annotation.Experimental;
-import io.fabric8.kubernetes.api.model.Pod;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/** JobManager spec. */
+/** Spec change type. */
@Experimental
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class JobManagerSpec {
- /** Resource specification for the JobManager pods. */
- private Resource resource;
+public enum DiffType {
- /** Number of JobManager replicas. Must be 1 for non-HA deployments. */
- private int replicas = 1;
+ /** Ignorable spec change. */
+ IGNORE,
+ /** Scalable spec change. */
+ SCALE,
+ /** Upgradable spec change. */
+ UPGRADE;
- /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */
- private Pod podTemplate;
+ public static DiffType max(DiffType left, DiffType right) {
+ return (left.ordinal() >= right.ordinal()) ? left : right;
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diffable.java
similarity index 58%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diffable.java
index d32ab0e2..cd9919db 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/Diffable.java
@@ -15,27 +15,18 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.crd.spec;
+package org.apache.flink.kubernetes.operator.reconciler.diff;
import org.apache.flink.annotation.Experimental;
-import io.fabric8.kubernetes.api.model.Pod;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/** JobManager spec. */
+/**
+ * {@link Diffable} classes can be compared with other {@link Diffable} objects for differences.
+ *
+ * <p>Inspired by:
+ * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/Diffable.java
+ */
@Experimental
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class JobManagerSpec {
- /** Resource specification for the JobManager pods. */
- private Resource resource;
-
- /** Number of JobManager replicas. Must be 1 for non-HA deployments. */
- private int replicas = 1;
-
- /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */
- private Pod podTemplate;
+@FunctionalInterface
+public interface Diffable<T> {
+ DiffResult<T> diff(T obj);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
new file mode 100644
index 00000000..c9b137b0
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/ReflectiveDiffBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.diff;
+
+import org.apache.flink.annotation.Experimental;
+
+import lombok.NonNull;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.builder.Builder;
+import org.apache.commons.lang3.reflect.FieldUtils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.commons.lang3.reflect.FieldUtils.readField;
+import static org.apache.flink.kubernetes.operator.reconciler.diff.DiffType.UPGRADE;
+
+/**
+ * Assists in implementing {@link Diffable#diff(Object)} methods with reflection.
+ *
+ * <p>Inspired by:
+ * https://github.com/apache/commons-lang/blob/master/src/main/java/org/apache/commons/lang3/builder/ReflectionDiffBuilder.java
+ */
+@Experimental
+public class ReflectiveDiffBuilder<T> implements Builder<DiffResult<T>> {
+
+ private final Object left;
+ private final Object right;
+ private final DiffBuilder<T> diffBuilder;
+
+ public ReflectiveDiffBuilder(@NonNull final T lhs, @NonNull final T rhs) {
+ this.left = lhs;
+ this.right = rhs;
+ diffBuilder = new DiffBuilder<>(lhs, rhs);
+ }
+
+ @Override
+ public DiffResult<T> build() {
+ if (left.equals(right)) {
+ return diffBuilder.build();
+ }
+
+ appendFields(left.getClass());
+ return diffBuilder.build();
+ }
+
+ private void appendFields(final Class<?> clazz) {
+ for (final Field field : FieldUtils.getAllFields(clazz)) {
+ if (accept(field)) {
+ try {
+ var leftField = readField(field, left, true);
+ var rightField = readField(field, right, true);
+ if (field.isAnnotationPresent(SpecDiff.Config.class)
+ && Map.class.isAssignableFrom(field.getType())) {
+ diffBuilder.append(
+ field.getName(),
+ configDiff(
+ field,
+ (leftField != null)
+ ? (Map<String, String>) leftField
+ : new HashMap<>(),
+ (rightField != null)
+ ? (Map<String, String>) rightField
+ : new HashMap<>()));
+ } else if (field.isAnnotationPresent(SpecDiff.class)) {
+ var annotation = field.getAnnotation(SpecDiff.class);
+ diffBuilder.append(
+ field.getName(), leftField, rightField, annotation.value());
+ } else if (Diffable.class.isAssignableFrom(field.getType())
+ && ObjectUtils.allNotNull(leftField, rightField)) {
+
+ diffBuilder.append(
+ field.getName(), ((Diffable<T>) leftField).diff((T) rightField));
+ } else {
+ diffBuilder.append(
+ field.getName(),
+ readField(field, left, true),
+ readField(field, right, true),
+ UPGRADE);
+ }
+
+ } catch (final IllegalAccessException ex) {
+ throw new RuntimeException(
+ "Unexpected IllegalAccessException: " + ex.getMessage());
+ }
+ }
+ }
+ }
+
+ private boolean accept(final Field field) {
+ if (field.getName().indexOf(ClassUtils.INNER_CLASS_SEPARATOR_CHAR) != -1) {
+ return false;
+ }
+ if (Modifier.isTransient(field.getModifiers())) {
+ return false;
+ }
+ return !Modifier.isStatic(field.getModifiers());
+ }
+
+ private static DiffResult<Map<String, String>> configDiff(
+ Field field, Map<String, String> left, Map<String, String> right) {
+ var keys = new HashSet<String>();
+ keys.addAll(left.keySet());
+ keys.addAll(right.keySet());
+ var diffBuilder = new DiffBuilder<>(left, right);
+ var annotation = field.getAnnotation(SpecDiff.Config.class);
+
+ keys.forEach(
+ key -> {
+ if (annotation != null) {
+ DiffType diffType = getType(annotation, key);
+ diffBuilder.append(key, left.get(key), right.get(key), diffType);
+ } else {
+ diffBuilder.append(key, left.get(key), right.get(key), UPGRADE);
+ }
+ });
+
+ return diffBuilder.build();
+ }
+
+ private static DiffType getType(SpecDiff.Config annotation, String key) {
+ DiffType diffType = UPGRADE;
+ for (var entry : annotation.value()) {
+ if (key.startsWith(entry.prefix())) {
+ return entry.type();
+ }
+ }
+ return diffType;
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiff.java
similarity index 53%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiff.java
index d32ab0e2..647eec73 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiff.java
@@ -15,27 +15,35 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.crd.spec;
+package org.apache.flink.kubernetes.operator.reconciler.diff;
import org.apache.flink.annotation.Experimental;
-import io.fabric8.kubernetes.api.model.Pod;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
-/** JobManager spec. */
+/** Spec diff annotation. */
@Experimental
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class JobManagerSpec {
- /** Resource specification for the JobManager pods. */
- private Resource resource;
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SpecDiff {
+ DiffType value() default DiffType.UPGRADE;
- /** Number of JobManager replicas. Must be 1 for non-HA deployments. */
- private int replicas = 1;
+ /** Spec diff config annotation. */
+ @Target(ElementType.FIELD)
+ @Retention(RetentionPolicy.RUNTIME)
+ @interface Config {
+ Entry[] value();
+ }
- /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */
- private Pod podTemplate;
+ /** Spec diff config annotation entry. */
+ @Target(ElementType.FIELD)
+ @Retention(RetentionPolicy.RUNTIME)
+ @interface Entry {
+ String prefix();
+
+ DiffType type();
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index d7aa1b98..560c5cc5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -93,4 +93,8 @@ public interface FlinkService {
PodList getJmPodList(FlinkDeployment deployment, Configuration conf);
void waitForClusterShutdown(Configuration conf);
+
+ default boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) {
+ return false;
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 08c2ef4e..8b46c91c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
@@ -33,6 +34,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
import org.apache.flink.kubernetes.operator.standalone.KubernetesStandaloneClusterDescriptor;
+import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -180,4 +182,40 @@ public class StandaloneFlinkService extends AbstractFlinkService {
.delete();
}
}
+
+ @Override
+ public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) {
+ if (conf.get(JobManagerOptions.SCHEDULER_MODE) == null) {
+ LOG.info("Reactive scaling is not enabled");
+ return false;
+ }
+
+ var clusterId = meta.getName();
+ var namespace = meta.getNamespace();
+ var name = StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId);
+ var deployment =
+ kubernetesClient.apps().deployments().inNamespace(namespace).withName(name);
+
+ if (deployment == null || deployment.get() == null) {
+ LOG.warn("TM Deployment ({}) not found", name);
+ return false;
+ }
+
+ var actualReplicas = deployment.get().getSpec().getReplicas();
+ var desiredReplicas =
+ conf.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
+ if (actualReplicas != desiredReplicas) {
+ LOG.info(
+ "Scaling TM replicas: actual({}) -> desired({})",
+ actualReplicas,
+ desiredReplicas);
+ deployment.scale(desiredReplicas);
+ } else {
+ LOG.info(
+ "Not scaling TM replicas: actual({}) == desired({})",
+ actualReplicas,
+ desiredReplicas);
+ }
+ return true;
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index b79a57f0..3d87fda1 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -163,6 +164,9 @@ public class TestUtils {
.withUid(UUID.randomUUID().toString())
.withGeneration(1L)
.build());
+
+ Map<String, String> conf = new HashMap<>();
+ conf.put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "header");
sessionJob.setSpec(
FlinkSessionJobSpec.builder()
.deploymentName(TEST_DEPLOYMENT_NAME)
@@ -173,6 +177,7 @@ public class TestUtils {
.upgradeMode(UpgradeMode.STATELESS)
.state(JobState.RUNNING)
.build())
+ .flinkConfiguration(conf)
.build());
return sessionJob;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index ba929289..f25118e1 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
@@ -40,6 +41,7 @@ import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
+import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -98,6 +100,7 @@ public class TestingFlinkService extends AbstractFlinkService {
private Consumer<Configuration> listJobConsumer = conf -> {};
private List<String> disposedSavepoints = new ArrayList<>();
private Map<String, Boolean> savepointTriggers = new HashMap<>();
+ private int desiredReplicas = 0;
public TestingFlinkService() {
super(null, new FlinkConfigManager(new Configuration()));
@@ -476,4 +479,18 @@ public class TestingFlinkService extends AbstractFlinkService {
public Map<String, String> getClusterInfo(Configuration conf) {
return CLUSTER_INFO;
}
+
+ @Override
+ public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) {
+ if (conf.get(JobManagerOptions.SCHEDULER_MODE) == null) {
+ return false;
+ }
+ desiredReplicas =
+ conf.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
+ return true;
+ }
+
+ public int getDesiredReplicas() {
+ return desiredReplicas;
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index ac4272b1..73a707b0 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -19,7 +19,10 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
@@ -29,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
@@ -79,7 +83,7 @@ public class ApplicationReconcilerTest {
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
var statusRecoder = new TestingStatusRecorder<FlinkDeployment, FlinkDeploymentStatus>();
- flinkService = new TestingFlinkService();
+ flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
reconciler =
new ApplicationReconciler(
@@ -466,6 +470,7 @@ public class ApplicationReconcilerTest {
.put(
KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT.key(),
"true");
+ spDeployment.getSpec().setImage("flink:greatest");
reconciler.reconcile(spDeployment, context);
assertEquals(
"trigger_0",
@@ -530,4 +535,77 @@ public class ApplicationReconcilerTest {
UpgradeMode.SAVEPOINT,
reconStatus.deserializeLastReconciledSpec().getJob().getUpgradeMode());
}
+
+ @Test
+ public void testScaleWithReactiveModeDisabled() throws Exception {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+
+ reconciler.reconcile(deployment, context);
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+ deployment.getSpec().getJob().setParallelism(100);
+ reconciler.reconcile(deployment, context);
+ assertEquals(
+ JobState.SUSPENDED,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getState());
+ }
+
+ @Test
+ public void testScaleWithReactiveModeEnabled() throws Exception {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(
+ JobManagerOptions.SCHEDULER_MODE.key(),
+ SchedulerExecutionMode.REACTIVE.name());
+
+ reconciler.reconcile(deployment, context);
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+ // the default.parallelism is always ignored
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
+ reconciler.reconcile(deployment, context);
+ assertEquals(
+ JobState.RUNNING,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getState());
+ assertEquals(0, flinkService.getDesiredReplicas());
+
+ deployment.getSpec().getJob().setParallelism(4);
+ reconciler.reconcile(deployment, context);
+ assertEquals(
+ JobState.RUNNING,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getState());
+ assertEquals(2, flinkService.getDesiredReplicas());
+
+ deployment.getSpec().getJob().setParallelism(8);
+ reconciler.reconcile(deployment, context);
+ assertEquals(
+ JobState.RUNNING,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getState());
+ assertEquals(4, flinkService.getDesiredReplicas());
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
new file mode 100644
index 00000000..6f39669a
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.diff;
+
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_INTERVAL;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Spec diff test. */
+public class SpecDiffTest {
+
+ @Test
+ public void testFlinkDeploymentSpecChanges() {
+ var left = new FlinkDeploymentSpec();
+ var right = ReconciliationUtils.clone(left);
+ var diff = left.diff(right);
+ assertEquals(DiffType.IGNORE, diff.getType());
+ assertEquals(0, diff.getNumDiffs());
+
+ left = TestUtils.buildSessionCluster().getSpec();
+ right = ReconciliationUtils.clone(left);
+ diff = left.diff(right);
+ assertEquals(DiffType.IGNORE, diff.getType());
+ assertEquals(0, diff.getNumDiffs());
+
+ left = TestUtils.buildApplicationCluster().getSpec();
+ left.setPodTemplate(TestUtils.getTestPod("localhost", "v1", List.of()));
+ left.setIngress(IngressSpec.builder().template("template").build());
+
+ right = ReconciliationUtils.clone(left);
+ diff = left.diff(right);
+ assertEquals(DiffType.IGNORE, diff.getType());
+ assertEquals(0, diff.getNumDiffs());
+
+ assertEquals(0, diff.getNumDiffs());
+ right.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ right.getJob().setAllowNonRestoredState(true);
+ right.getJob().setInitialSavepointPath("local:///tmp");
+ right.getJob().setSavepointTriggerNonce(123L);
+ right.getFlinkConfiguration().put(OPERATOR_RECONCILE_INTERVAL.key(), "100 SECONDS");
+ right.getFlinkConfiguration().put(SCOPE_NAMING_KUBERNETES_OPERATOR.key(), "foo.bar");
+ right.getFlinkConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
+
+ diff = left.diff(right);
+ assertEquals(DiffType.IGNORE, diff.getType());
+ assertEquals(7, diff.getNumDiffs());
+
+ right.getFlinkConfiguration().remove(SCOPE_NAMING_KUBERNETES_OPERATOR.key());
+
+ diff = left.diff(right);
+ assertEquals(DiffType.IGNORE, diff.getType());
+ assertEquals(6, diff.getNumDiffs());
+
+ right.getJob().setParallelism(100);
+
+ diff = left.diff(right);
+ assertEquals(DiffType.SCALE, diff.getType());
+ assertEquals(7, diff.getNumDiffs());
+
+ right.setImage("flink:greatest");
+ right.setImagePullPolicy("never:pull");
+ right.setServiceAccount("anonymous");
+ right.setFlinkVersion(FlinkVersion.v1_13);
+ right.setMode(KubernetesDeploymentMode.STANDALONE);
+ right.setLogConfiguration(Map.of("foo", "bar"));
+
+ diff = left.diff(right);
+ assertEquals(DiffType.UPGRADE, diff.getType());
+ assertEquals(13, diff.getNumDiffs());
+
+ right.getJobManager().getResource().setMemory("999m");
+ right.getTaskManager().setReplicas(999);
+ right.getPodTemplate().setApiVersion("v2");
+ right.getIngress().setTemplate("none");
+
+ diff = left.diff(right);
+ assertEquals(DiffType.UPGRADE, diff.getType());
+ assertEquals(17, diff.getNumDiffs());
+
+ right.getJob().setJarURI("missing.jar");
+ right.getJob().setEntryClass("missing.Class");
+ right.getJob().setArgs(new String[] {"foo", "bar"});
+ right.getJob().setState(JobState.SUSPENDED);
+
+ diff = left.diff(right);
+ assertEquals(DiffType.UPGRADE, diff.getType());
+ assertEquals(21, diff.getNumDiffs());
+
+ right.getFlinkConfiguration().put(CoreOptions.FLINK_TM_JVM_OPTIONS.key(), "-Dfoo=bar");
+
+ diff = left.diff(right);
+ assertEquals(DiffType.UPGRADE, diff.getType());
+ assertEquals(22, diff.getNumDiffs());
+ }
+
+ @Test
+ public void testFlinkSessionJobSpecChanges() {
+ var left = new FlinkSessionJobSpec();
+ var right = ReconciliationUtils.clone(left);
+ var diff = left.diff(right);
+ assertEquals(DiffType.IGNORE, diff.getType());
+ assertEquals(0, diff.getNumDiffs());
+
+ left = TestUtils.buildSessionJob().getSpec();
+ right = ReconciliationUtils.clone(left);
+ diff = left.diff(right);
+ assertEquals(DiffType.IGNORE, diff.getType());
+ assertEquals(0, diff.getNumDiffs());
+
+ assertEquals(0, diff.getNumDiffs());
+ right.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ right.getJob().setAllowNonRestoredState(true);
+ right.getJob().setInitialSavepointPath("local:///tmp");
+ right.getJob().setSavepointTriggerNonce(123L);
+ right.getFlinkConfiguration()
+ .put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "changed");
+
+ diff = left.diff(right);
+ assertEquals(DiffType.IGNORE, diff.getType());
+ assertEquals(5, diff.getNumDiffs());
+
+ right.getJob().setParallelism(100);
+
+ diff = left.diff(right);
+ assertEquals(DiffType.SCALE, diff.getType());
+ assertEquals(6, diff.getNumDiffs());
+
+ right.setDeploymentName("missing");
+ right.getJob().setJarURI("missing.jar");
+ right.getJob().setEntryClass("missing.Class");
+ right.getJob().setArgs(new String[] {"foo", "bar"});
+ right.getJob().setState(JobState.SUSPENDED);
+
+ diff = left.diff(right);
+ assertEquals(DiffType.UPGRADE, diff.getType());
+ assertEquals(11, diff.getNumDiffs());
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index ec09f546..defe48a3 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -18,15 +18,20 @@
package org.apache.flink.kubernetes.operator.service;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -36,6 +41,8 @@ import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link StandaloneFlinkService unit tests */
@EnableKubernetesMockClient(crud = true)
@@ -61,7 +68,7 @@ public class StandaloneFlinkServiceTest {
FlinkDeployment flinkDeployment = TestUtils.buildSessionCluster();
configuration = buildConfig(flinkDeployment, configuration);
- createDeployments();
+ createDeployments(flinkDeployment);
List<Deployment> deployments = kubernetesClient.apps().deployments().list().getItems();
@@ -80,7 +87,7 @@ public class StandaloneFlinkServiceTest {
FlinkDeployment flinkDeployment = TestUtils.buildSessionCluster();
configuration = buildConfig(flinkDeployment, configuration);
- createDeployments();
+ createDeployments(flinkDeployment);
List<Deployment> deployments = kubernetesClient.apps().deployments().list().getItems();
assertEquals(2, deployments.size());
@@ -93,6 +100,83 @@ public class StandaloneFlinkServiceTest {
assertEquals(0, deployments.size());
}
+ @Test
+ public void testReactiveScale() throws Exception {
+ var flinkDeployment = TestUtils.buildApplicationCluster();
+ var clusterId = flinkDeployment.getMetadata().getName();
+ var namespace = flinkDeployment.getMetadata().getNamespace();
+ flinkDeployment.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+ flinkDeployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(
+ JobManagerOptions.SCHEDULER_MODE.key(),
+ SchedulerExecutionMode.REACTIVE.name());
+ createDeployments(flinkDeployment);
+ assertTrue(
+ flinkStandaloneService.scale(
+ flinkDeployment.getMetadata(),
+ flinkDeployment.getSpec().getJob(),
+ buildConfig(flinkDeployment, configuration)));
+
+ assertEquals(
+ 1,
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+ .get()
+ .getSpec()
+ .getReplicas());
+
+ flinkDeployment.getSpec().getJob().setParallelism(4);
+ assertTrue(
+ flinkStandaloneService.scale(
+ flinkDeployment.getMetadata(),
+ flinkDeployment.getSpec().getJob(),
+ buildConfig(flinkDeployment, configuration)));
+ assertEquals(
+ 2,
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+ .get()
+ .getSpec()
+ .getReplicas());
+
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+ .delete();
+ assertFalse(
+ flinkStandaloneService.scale(
+ flinkDeployment.getMetadata(),
+ flinkDeployment.getSpec().getJob(),
+ buildConfig(flinkDeployment, configuration)));
+
+ createDeployments(flinkDeployment);
+ assertTrue(
+ flinkStandaloneService.scale(
+ flinkDeployment.getMetadata(),
+ flinkDeployment.getSpec().getJob(),
+ buildConfig(flinkDeployment, configuration)));
+
+ flinkDeployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .remove(JobManagerOptions.SCHEDULER_MODE.key());
+ assertFalse(
+ flinkStandaloneService.scale(
+ flinkDeployment.getMetadata(),
+ flinkDeployment.getSpec().getJob(),
+ buildConfig(flinkDeployment, configuration)));
+ }
+
private Configuration buildConfig(FlinkDeployment flinkDeployment, Configuration configuration)
throws Exception {
return FlinkConfigBuilder.buildFrom(
@@ -102,29 +186,28 @@ public class StandaloneFlinkServiceTest {
configuration);
}
- private void createDeployments() {
+ private void createDeployments(AbstractFlinkResource cr) {
Deployment jmDeployment = new Deployment();
ObjectMeta jmMetadata = new ObjectMeta();
jmMetadata.setName(
- StandaloneKubernetesUtils.getJobManagerDeploymentName(
- TestUtils.TEST_DEPLOYMENT_NAME));
+ StandaloneKubernetesUtils.getJobManagerDeploymentName(cr.getMetadata().getName()));
jmDeployment.setMetadata(jmMetadata);
kubernetesClient
.apps()
.deployments()
- .inNamespace(TestUtils.TEST_NAMESPACE)
- .create(jmDeployment);
+ .inNamespace(cr.getMetadata().getNamespace())
+ .createOrReplace(jmDeployment);
Deployment tmDeployment = new Deployment();
ObjectMeta tmMetadata = new ObjectMeta();
tmMetadata.setName(
- StandaloneKubernetesUtils.getTaskManagerDeploymentName(
- TestUtils.TEST_DEPLOYMENT_NAME));
+ StandaloneKubernetesUtils.getTaskManagerDeploymentName(cr.getMetadata().getName()));
tmDeployment.setMetadata(tmMetadata);
+ tmDeployment.setSpec(new DeploymentSpec());
kubernetesClient
.apps()
.deployments()
- .inNamespace(TestUtils.TEST_NAMESPACE)
- .create(tmDeployment);
+ .inNamespace(cr.getMetadata().getNamespace())
+ .createOrReplace(tmDeployment);
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index dc2ac584..418bcb33 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -59,7 +59,7 @@ public class FlinkUtilsTest {
Container container2 = new Container();
container2.setName("container2");
- Pod pod1 = TestUtils.getTestPod("pod1 hostname", "pod1 api version", List.of(container2));
+ Pod pod1 = TestUtils.getTestPod("pod1 hostname", "pod1 api version", List.of());
Pod pod2 =
TestUtils.getTestPod(