You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/04 13:07:27 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-30411] Validate empty JmSpec and TmSpec

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4feee47f [FLINK-30411] Validate empty JmSpec and TmSpec
4feee47f is described below

commit 4feee47f0175d12ad192fbfc89d483793dcae5b8
Author: Navaneesh Kumar <na...@gmail.com>
AuthorDate: Wed Jan 4 05:07:21 2023 -0800

    [FLINK-30411] Validate empty JmSpec and TmSpec
---
 .../operator/validation/DefaultValidator.java      | 63 +++++++++++++++++++---
 .../operator/validation/DefaultValidatorTest.java  | 37 +++++++++++--
 2 files changed, 90 insertions(+), 10 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 91ab375e..d9af5891 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.validation;
 
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -41,6 +42,8 @@ import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
@@ -87,7 +90,7 @@ public class DefaultValidator implements FlinkResourceValidator {
                 validateLogConfig(spec.getLogConfiguration()),
                 validateJobSpec(spec.getJob(), spec.getTaskManager(), effectiveConfig),
                 validateJmSpec(spec.getJobManager(), effectiveConfig),
-                validateTmSpec(spec.getTaskManager()),
+                validateTmSpec(spec.getTaskManager(), effectiveConfig),
                 validateSpecChange(deployment, effectiveConfig),
                 validateServiceAccount(spec.getServiceAccount()));
     }
@@ -258,15 +261,36 @@ public class DefaultValidator implements FlinkResourceValidator {
     }
 
     private Optional<String> validateJmSpec(JobManagerSpec jmSpec, Map<String, String> confMap) {
+        Configuration conf = Configuration.fromMap(confMap);
+        var jmMemoryDefined =
+                jmSpec != null
+                        && jmSpec.getResource() != null
+                        && !StringUtils.isNullOrWhitespaceOnly(jmSpec.getResource().getMemory());
+        Optional<String> jmMemoryValidation =
+                jmMemoryDefined ? Optional.empty() : validateJmMemoryConfig(conf);
+
         if (jmSpec == null) {
-            return Optional.empty();
+            return jmMemoryValidation;
         }
 
         return firstPresent(
+                jmMemoryValidation,
                 validateResources("JobManager", jmSpec.getResource()),
                 validateJmReplicas(jmSpec.getReplicas(), confMap));
     }
 
+    private Optional<String> validateJmMemoryConfig(Configuration conf) {
+        try {
+            JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
+                    conf, JobManagerOptions.JVM_HEAP_MEMORY);
+        } catch (Exception e) {
+            return Optional.of(
+                    "JobManager resource memory must be defined using `spec.jobManager.resource.memory`");
+        }
+
+        return Optional.empty();
+    }
+
     private Optional<String> validateJmReplicas(int replicas, Map<String, String> confMap) {
         if (replicas < 1) {
             return Optional.of("JobManager replicas should not be configured less than one.");
@@ -278,16 +302,41 @@ public class DefaultValidator implements FlinkResourceValidator {
         return Optional.empty();
     }
 
-    private Optional<String> validateTmSpec(TaskManagerSpec tmSpec) {
+    private Optional<String> validateTmSpec(TaskManagerSpec tmSpec, Map<String, String> confMap) {
+        Configuration conf = Configuration.fromMap(confMap);
+
+        var tmMemoryDefined =
+                tmSpec != null
+                        && tmSpec.getResource() != null
+                        && !StringUtils.isNullOrWhitespaceOnly(tmSpec.getResource().getMemory());
+        Optional<String> tmMemoryConfigValidation =
+                tmMemoryDefined ? Optional.empty() : validateTmMemoryConfig(conf);
+
         if (tmSpec == null) {
-            return Optional.empty();
+            return tmMemoryConfigValidation;
         }
 
+        return firstPresent(
+                tmMemoryConfigValidation,
+                validateResources("TaskManager", tmSpec.getResource()),
+                validateTmReplicas(tmSpec));
+    }
+
+    private Optional<String> validateTmMemoryConfig(Configuration conf) {
+        try {
+            TaskExecutorProcessUtils.processSpecFromConfig(conf);
+        } catch (Exception e) {
+            return Optional.of(
+                    "TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
+        }
+        return Optional.empty();
+    }
+
+    private Optional<String> validateTmReplicas(TaskManagerSpec tmSpec) {
         if (tmSpec.getReplicas() != null && tmSpec.getReplicas() < 1) {
             return Optional.of("TaskManager replicas should not be configured less than one.");
         }
-
-        return validateResources("TaskManager", tmSpec.getResource());
+        return Optional.empty();
     }
 
     private Optional<String> validateResources(String component, Resource resource) {
@@ -297,7 +346,7 @@ public class DefaultValidator implements FlinkResourceValidator {
 
         String memory = resource.getMemory();
         if (memory == null) {
-            return Optional.of(component + " resource memory must be defined.");
+            return Optional.empty();
         }
 
         try {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 7036618e..9576c3e2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -59,6 +59,10 @@ import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Consumer;
 
+import static org.apache.flink.configuration.JobManagerOptions.JVM_HEAP_MEMORY;
+import static org.apache.flink.configuration.TaskManagerOptions.MANAGED_MEMORY_SIZE;
+import static org.apache.flink.configuration.TaskManagerOptions.TASK_HEAP_MEMORY;
+import static org.apache.flink.configuration.TaskManagerOptions.TOTAL_FLINK_MEMORY;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -251,13 +255,40 @@ public class DefaultValidatorTest {
         testError(
                 dep -> dep.getSpec().getJobManager().getResource().setMemory("invalid"),
                 "JobManager resource memory parse error");
-
         testError(
                 dep -> dep.getSpec().getTaskManager().getResource().setMemory(null),
-                "TaskManager resource memory must be defined");
+                "TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
         testError(
                 dep -> dep.getSpec().getJobManager().getResource().setMemory(null),
-                "JobManager resource memory must be defined");
+                "JobManager resource memory must be defined using `spec.jobManager.resource.memory`");
+        testError(
+                dep -> {
+                    dep.getSpec().getTaskManager().getResource().setMemory(null);
+                    dep.getSpec().setFlinkConfiguration(Map.of(TASK_HEAP_MEMORY.key(), "1024m"));
+                },
+                "TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
+
+        testSuccess(
+                dep -> {
+                    dep.getSpec().getJobManager().getResource().setMemory(null);
+                    dep.getSpec().setFlinkConfiguration(Map.of(JVM_HEAP_MEMORY.key(), "2048m"));
+                });
+        testSuccess(
+                dep -> {
+                    dep.getSpec().getTaskManager().getResource().setMemory(null);
+                    dep.getSpec().setFlinkConfiguration(Map.of(TOTAL_FLINK_MEMORY.key(), "2048m"));
+                });
+        testSuccess(
+                dep -> {
+                    dep.getSpec().getTaskManager().getResource().setMemory(null);
+                    dep.getSpec()
+                            .setFlinkConfiguration(
+                                    Map.of(
+                                            TASK_HEAP_MEMORY.key(),
+                                            "1024m",
+                                            MANAGED_MEMORY_SIZE.key(),
+                                            "1024m"));
+                });
 
         // Test savepoint restore validation
         testSuccess(