You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/18 10:15:46 UTC

[GitHub] [flink-kubernetes-operator] nowke opened a new pull request, #487: [FLINK-30411] Validate empty JmSpec and TmSpec

nowke opened a new pull request, #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487

   ## What is the purpose of the change
   
   This PR validates `FlinkDeployment` without `jobManager` or `taskManager` spec. Currently, `FlinkDeployment` gets stuck in `UPGRADING` state if you don't specify `jobManager` or `taskManager` spec. 
   
   By adding validation, `helm install` directly fails.
   
   **Example: `basic.yaml`**
   
   ```yaml
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: basic-example
   spec:
     image: flink:1.15
     flinkVersion: v1_15
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
     serviceAccount: flink
     job:
       jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
       parallelism: 2
       upgradeMode: stateless
   ```
   
   ## Brief change log
   
   - Validate `FlinkDeployment` without `jobManager` or `taskManager` spec.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Extended existing unit test for the change as part of the PR
   - Manually verified by removing `jobManager` and `taskManager` from [`examples/basic.yaml`](https://github.com/apache/flink-kubernetes-operator/blob/c8ed11dfde1e0fc57d8fa45b0121ecec362bd03d/examples/basic.yaml#L29-L36)
     - *Before change* - ensured `FlinkDeployment` gets stuck in `UPGRADING` state.
     - *After change* - `helm install example/basic.yaml` fails with error message. Reverting back the file deploys the job successfully.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no
     - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1057748254


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -241,15 +245,35 @@ private Optional<String> validateJobSpec(
     }
 
     private Optional<String> validateJmSpec(JobManagerSpec jmSpec, Map<String, String> confMap) {
+        Configuration conf = Configuration.fromMap(confMap);
+        var jmMemoryDefined =
+                jmSpec != null
+                        && jmSpec.getResource() != null
+                        && !StringUtils.isNullOrWhitespaceOnly(jmSpec.getResource().getMemory());
+        Optional<String> jmMemoryValidation =
+                jmMemoryDefined ? Optional.empty() : validateJmMemoryConfig(conf);
+
         if (jmSpec == null) {
-            return Optional.empty();
+            return jmMemoryValidation;
         }
 
         return firstPresent(
+                jmMemoryValidation,
                 validateResources("JobManager", jmSpec.getResource()),
                 validateJmReplicas(jmSpec.getReplicas(), confMap));
     }
 
+    private Optional<String> validateJmMemoryConfig(Configuration conf) {
+        try {
+            JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(

Review Comment:
   Why do we use this method vs using `processSpecFromConfig`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] nowke commented on a diff in pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

Posted by GitBox <gi...@apache.org>.
nowke commented on code in PR #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1059562798


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java:
##########
@@ -248,13 +252,40 @@ public void testValidationWithoutDefaultConfig() {
         testError(
                 dep -> dep.getSpec().getJobManager().getResource().setMemory("invalid"),
                 "JobManager resource memory parse error");
-
         testError(
                 dep -> dep.getSpec().getTaskManager().getResource().setMemory(null),
-                "TaskManager resource memory must be defined");
+                "TaskManager memory configuration failed");

Review Comment:
   It is thrown by `IllegalConfigurationException` - [JobManagerProcessUtils.java#L7](https://github.com/apache/flink/blob/5c3658aa06b79e8039043145560a1ad2bcce68b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java#L78)
   
   Full error message is very descriptive, example:
   
   ```
   TaskManager memory configuration failed: Either required fine-grained memory 
   (taskmanager.memory.task.heap.size and taskmanager.memory.managed.size), 
   or Total Flink Memory size (Key: 'taskmanager.memory.flink.size' , default: null (fallback keys: [])), 
   or Total Process Memory size (Key: 'taskmanager.memory.process.size' , 
   default: null (fallback keys: [])) need to be configured explicitly.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1059629736


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java:
##########
@@ -248,13 +252,40 @@ public void testValidationWithoutDefaultConfig() {
         testError(
                 dep -> dep.getSpec().getJobManager().getResource().setMemory("invalid"),
                 "JobManager resource memory parse error");
-
         testError(
                 dep -> dep.getSpec().getTaskManager().getResource().setMemory(null),
-                "TaskManager resource memory must be defined");
+                "TaskManager memory configuration failed");

Review Comment:
   In this case I would expect an error instructing the user to set `spect.taskmanager.resource.memory` instead of the flink configs directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] nowke commented on a diff in pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

Posted by GitBox <gi...@apache.org>.
nowke commented on code in PR #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1061092420


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java:
##########
@@ -248,13 +252,40 @@ public void testValidationWithoutDefaultConfig() {
         testError(
                 dep -> dep.getSpec().getJobManager().getResource().setMemory("invalid"),
                 "JobManager resource memory parse error");
-
         testError(
                 dep -> dep.getSpec().getTaskManager().getResource().setMemory(null),
-                "TaskManager resource memory must be defined");
+                "TaskManager memory configuration failed");

Review Comment:
   Changed it to `TaskManager resource memory must be defined using 'spec.taskManager.resource.memory'`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487#issuecomment-1360392616

   @gyfora how do you think we should fix this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] nowke commented on pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

Posted by GitBox <gi...@apache.org>.
nowke commented on PR #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487#issuecomment-1360919023

   > @gyfora how do you think we should fix this?
   
   @morhidi @gyfora I added further validations with configs set through `flinkConfiguration`. It does make the validation logic a little complicated. Do you suggest any alternative approach?
   
   Besides helm validations, the deployment gets stuck in `UPGRADING` state due to `ClusterDeploymentException`. Should it fail the deployment instead upon encountering  `ClusterDeploymentException` a fixed number of times? Even after the validations in this PR, the deployment can still go into this state by setting insufficient memory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] nowke commented on a diff in pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

Posted by GitBox <gi...@apache.org>.
nowke commented on code in PR #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1057990470


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -241,15 +245,35 @@ private Optional<String> validateJobSpec(
     }
 
     private Optional<String> validateJmSpec(JobManagerSpec jmSpec, Map<String, String> confMap) {
+        Configuration conf = Configuration.fromMap(confMap);
+        var jmMemoryDefined =
+                jmSpec != null
+                        && jmSpec.getResource() != null
+                        && !StringUtils.isNullOrWhitespaceOnly(jmSpec.getResource().getMemory());
+        Optional<String> jmMemoryValidation =
+                jmMemoryDefined ? Optional.empty() : validateJmMemoryConfig(conf);
+
         if (jmSpec == null) {
-            return Optional.empty();
+            return jmMemoryValidation;
         }
 
         return firstPresent(
+                jmMemoryValidation,
                 validateResources("JobManager", jmSpec.getResource()),
                 validateJmReplicas(jmSpec.getReplicas(), confMap));
     }
 
+    private Optional<String> validateJmMemoryConfig(Configuration conf) {
+        try {
+            JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(

Review Comment:
   - `processSpecFromConfigWithNewOptionToInterpretLegacyHeap` internally calls `processSpecFromConfig` - [JobManagerProcessUtils.java#L73-L74](https://github.com/apache/flink/blob/75a92efd7b35501698e5de253e5231d680830c16/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java#L73-L74)
   - Also, `processSpecFromConfig` is not `public`, but package private - [JobManagerProcessUtils.java#L82](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java#L82)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1058430979


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -241,15 +245,35 @@ private Optional<String> validateJobSpec(
     }
 
     private Optional<String> validateJmSpec(JobManagerSpec jmSpec, Map<String, String> confMap) {
+        Configuration conf = Configuration.fromMap(confMap);
+        var jmMemoryDefined =
+                jmSpec != null
+                        && jmSpec.getResource() != null
+                        && !StringUtils.isNullOrWhitespaceOnly(jmSpec.getResource().getMemory());
+        Optional<String> jmMemoryValidation =
+                jmMemoryDefined ? Optional.empty() : validateJmMemoryConfig(conf);
+
         if (jmSpec == null) {
-            return Optional.empty();
+            return jmMemoryValidation;
         }
 
         return firstPresent(
+                jmMemoryValidation,
                 validateResources("JobManager", jmSpec.getResource()),
                 validateJmReplicas(jmSpec.getReplicas(), confMap));
     }
 
+    private Optional<String> validateJmMemoryConfig(Configuration conf) {
+        try {
+            JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
+                    conf, JobManagerOptions.JVM_HEAP_MEMORY);
+        } catch (IllegalConfigurationException e) {

Review Comment:
   I think we should catch any exception here in case Flink throws something else.



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java:
##########
@@ -248,13 +252,40 @@ public void testValidationWithoutDefaultConfig() {
         testError(
                 dep -> dep.getSpec().getJobManager().getResource().setMemory("invalid"),
                 "JobManager resource memory parse error");
-
         testError(
                 dep -> dep.getSpec().getTaskManager().getResource().setMemory(null),
-                "TaskManager resource memory must be defined");
+                "TaskManager memory configuration failed");

Review Comment:
   I think the previous error message was much more informative for the user. 
   "TaskManager memory configuration failed" is pretty hard to understand.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -261,16 +285,40 @@ private Optional<String> validateJmReplicas(int replicas, Map<String, String> co
         return Optional.empty();
     }
 
-    private Optional<String> validateTmSpec(TaskManagerSpec tmSpec) {
+    private Optional<String> validateTmSpec(TaskManagerSpec tmSpec, Map<String, String> confMap) {
+        Configuration conf = Configuration.fromMap(confMap);
+
+        var tmMemoryDefined =
+                tmSpec != null
+                        && tmSpec.getResource() != null
+                        && !StringUtils.isNullOrWhitespaceOnly(tmSpec.getResource().getMemory());
+        Optional<String> tmMemoryConfigValidation =
+                tmMemoryDefined ? Optional.empty() : validateTmMemoryConfig(conf);
+
         if (tmSpec == null) {
-            return Optional.empty();
+            return tmMemoryConfigValidation;
         }
 
+        return firstPresent(
+                tmMemoryConfigValidation,
+                validateResources("TaskManager", tmSpec.getResource()),
+                validateTmReplicas(tmSpec));
+    }
+
+    private Optional<String> validateTmMemoryConfig(Configuration conf) {
+        try {
+            TaskExecutorProcessUtils.processSpecFromConfig(conf);
+        } catch (IllegalConfigurationException e) {

Review Comment:
   I think we should catch any exception here in case Flink throws something else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] nowke commented on pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

Posted by GitBox <gi...@apache.org>.
nowke commented on PR #487:
URL: https://github.com/apache/flink-kubernetes-operator/pull/487#issuecomment-1368142154

   > I am still a bit torn on this PR. This adds quite a lot of complexity to the validation with some risks involved. Maybe we should just leave it as it is?
   
   Agreed on the added complexity, given the scope of the issue. Only additional benefit we get out of `processSpecFromConfig` from TM/JM utils is validating different combinations of `flinkConfiguration`, example: [https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1059562798](https://github.com/apache/flink-kubernetes-operator/pull/487#discussion_r1059562798).
   
   I am totally fine closing the PR unless you have any alternative approach in mind.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org