You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/04 13:07:27 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-30411] Validate empty JmSpec and TmSpec
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 4feee47f [FLINK-30411] Validate empty JmSpec and TmSpec
4feee47f is described below
commit 4feee47f0175d12ad192fbfc89d483793dcae5b8
Author: Navaneesh Kumar <na...@gmail.com>
AuthorDate: Wed Jan 4 05:07:21 2023 -0800
[FLINK-30411] Validate empty JmSpec and TmSpec
---
.../operator/validation/DefaultValidator.java | 63 +++++++++++++++++++---
.../operator/validation/DefaultValidatorTest.java | 37 +++++++++++--
2 files changed, 90 insertions(+), 10 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 91ab375e..d9af5891 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.validation;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -41,6 +42,8 @@ import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.util.StringUtils;
import javax.annotation.Nullable;
@@ -87,7 +90,7 @@ public class DefaultValidator implements FlinkResourceValidator {
validateLogConfig(spec.getLogConfiguration()),
validateJobSpec(spec.getJob(), spec.getTaskManager(), effectiveConfig),
validateJmSpec(spec.getJobManager(), effectiveConfig),
- validateTmSpec(spec.getTaskManager()),
+ validateTmSpec(spec.getTaskManager(), effectiveConfig),
validateSpecChange(deployment, effectiveConfig),
validateServiceAccount(spec.getServiceAccount()));
}
@@ -258,15 +261,36 @@ public class DefaultValidator implements FlinkResourceValidator {
}
private Optional<String> validateJmSpec(JobManagerSpec jmSpec, Map<String, String> confMap) {
+ Configuration conf = Configuration.fromMap(confMap);
+ var jmMemoryDefined =
+ jmSpec != null
+ && jmSpec.getResource() != null
+ && !StringUtils.isNullOrWhitespaceOnly(jmSpec.getResource().getMemory());
+ Optional<String> jmMemoryValidation =
+ jmMemoryDefined ? Optional.empty() : validateJmMemoryConfig(conf);
+
if (jmSpec == null) {
- return Optional.empty();
+ return jmMemoryValidation;
}
return firstPresent(
+ jmMemoryValidation,
validateResources("JobManager", jmSpec.getResource()),
validateJmReplicas(jmSpec.getReplicas(), confMap));
}
+ private Optional<String> validateJmMemoryConfig(Configuration conf) {
+ try {
+ JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
+ conf, JobManagerOptions.JVM_HEAP_MEMORY);
+ } catch (Exception e) {
+ return Optional.of(
+ "JobManager resource memory must be defined using `spec.jobManager.resource.memory`");
+ }
+
+ return Optional.empty();
+ }
+
private Optional<String> validateJmReplicas(int replicas, Map<String, String> confMap) {
if (replicas < 1) {
return Optional.of("JobManager replicas should not be configured less than one.");
@@ -278,16 +302,41 @@ public class DefaultValidator implements FlinkResourceValidator {
return Optional.empty();
}
- private Optional<String> validateTmSpec(TaskManagerSpec tmSpec) {
+ private Optional<String> validateTmSpec(TaskManagerSpec tmSpec, Map<String, String> confMap) {
+ Configuration conf = Configuration.fromMap(confMap);
+
+ var tmMemoryDefined =
+ tmSpec != null
+ && tmSpec.getResource() != null
+ && !StringUtils.isNullOrWhitespaceOnly(tmSpec.getResource().getMemory());
+ Optional<String> tmMemoryConfigValidation =
+ tmMemoryDefined ? Optional.empty() : validateTmMemoryConfig(conf);
+
if (tmSpec == null) {
- return Optional.empty();
+ return tmMemoryConfigValidation;
}
+ return firstPresent(
+ tmMemoryConfigValidation,
+ validateResources("TaskManager", tmSpec.getResource()),
+ validateTmReplicas(tmSpec));
+ }
+
+ private Optional<String> validateTmMemoryConfig(Configuration conf) {
+ try {
+ TaskExecutorProcessUtils.processSpecFromConfig(conf);
+ } catch (Exception e) {
+ return Optional.of(
+ "TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
+ }
+ return Optional.empty();
+ }
+
+ private Optional<String> validateTmReplicas(TaskManagerSpec tmSpec) {
if (tmSpec.getReplicas() != null && tmSpec.getReplicas() < 1) {
return Optional.of("TaskManager replicas should not be configured less than one.");
}
-
- return validateResources("TaskManager", tmSpec.getResource());
+ return Optional.empty();
}
private Optional<String> validateResources(String component, Resource resource) {
@@ -297,7 +346,7 @@ public class DefaultValidator implements FlinkResourceValidator {
String memory = resource.getMemory();
if (memory == null) {
- return Optional.of(component + " resource memory must be defined.");
+ return Optional.empty();
}
try {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 7036618e..9576c3e2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -59,6 +59,10 @@ import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
+import static org.apache.flink.configuration.JobManagerOptions.JVM_HEAP_MEMORY;
+import static org.apache.flink.configuration.TaskManagerOptions.MANAGED_MEMORY_SIZE;
+import static org.apache.flink.configuration.TaskManagerOptions.TASK_HEAP_MEMORY;
+import static org.apache.flink.configuration.TaskManagerOptions.TOTAL_FLINK_MEMORY;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -251,13 +255,40 @@ public class DefaultValidatorTest {
testError(
dep -> dep.getSpec().getJobManager().getResource().setMemory("invalid"),
"JobManager resource memory parse error");
-
testError(
dep -> dep.getSpec().getTaskManager().getResource().setMemory(null),
- "TaskManager resource memory must be defined");
+ "TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
testError(
dep -> dep.getSpec().getJobManager().getResource().setMemory(null),
- "JobManager resource memory must be defined");
+ "JobManager resource memory must be defined using `spec.jobManager.resource.memory`");
+ testError(
+ dep -> {
+ dep.getSpec().getTaskManager().getResource().setMemory(null);
+ dep.getSpec().setFlinkConfiguration(Map.of(TASK_HEAP_MEMORY.key(), "1024m"));
+ },
+ "TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
+
+ testSuccess(
+ dep -> {
+ dep.getSpec().getJobManager().getResource().setMemory(null);
+ dep.getSpec().setFlinkConfiguration(Map.of(JVM_HEAP_MEMORY.key(), "2048m"));
+ });
+ testSuccess(
+ dep -> {
+ dep.getSpec().getTaskManager().getResource().setMemory(null);
+ dep.getSpec().setFlinkConfiguration(Map.of(TOTAL_FLINK_MEMORY.key(), "2048m"));
+ });
+ testSuccess(
+ dep -> {
+ dep.getSpec().getTaskManager().getResource().setMemory(null);
+ dep.getSpec()
+ .setFlinkConfiguration(
+ Map.of(
+ TASK_HEAP_MEMORY.key(),
+ "1024m",
+ MANAGED_MEMORY_SIZE.key(),
+ "1024m"));
+ });
// Test savepoint restore validation
testSuccess(