You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "HuangZhenQiu (via GitHub)" <gi...@apache.org> on 2023/04/03 23:11:25 UTC

[GitHub] [flink-kubernetes-operator] HuangZhenQiu opened a new pull request, #561: [FLINK-30609] Add ephemeral storage to CRD

HuangZhenQiu opened a new pull request, #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561

   ## What is the purpose of the change
   Add ephemeral storage to CRD, so that users can apply ephemeral storage as resource requirement to containers of JM and TM.
   
   ## Brief change log 
   
     - Add ephemeral storage to Resource of flink deployment spec
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
     - Changes are tested in unit test in FlinkConfigBuilderTest
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (yes)
     - Core observer or reconciler logic that is regularly executed: ( no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] ashangit commented on pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "ashangit (via GitHub)" <gi...@apache.org>.
ashangit commented on PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#issuecomment-1495516034

   I'm wondering if it won't be cleaner to let flink manage the resources field as it is done for cpu, memory and external resources (see https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java#L168)
   and having the operator only providing the ephemeral storage config inside the flink configuration?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] HuangZhenQiu commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "HuangZhenQiu (via GitHub)" <gi...@apache.org>.
HuangZhenQiu commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1156779386


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -409,15 +419,44 @@ private static void setPodTemplate(
             effectiveConfig.setString(
                     podConfigOption,
                     createTempFile(
-                            mergePodTemplates(
-                                    basicPod,
-                                    appendPod,
-                                    effectiveConfig.get(
-                                            KubernetesOperatorConfigOptions
-                                                    .POD_TEMPLATE_MERGE_BY_NAME))));
+                            applyResourceToPodTemplate(
+                                    mergePodTemplates(
+                                            basicPod,
+                                            appendPod,
+                                            effectiveConfig.get(
+                                                    KubernetesOperatorConfigOptions
+                                                            .POD_TEMPLATE_MERGE_BY_NAME)),
+                                    resource)));
         }
     }
 
+    private static Pod applyResourceToPodTemplate(Pod podTemplate, Resource resource) {
+        if (resource != null
+                && !StringUtils.isNullOrWhitespaceOnly(resource.getEphemeralStorage())
+                && podTemplate != null
+                && podTemplate.getSpec() != null) {
+            for (Container container : podTemplate.getSpec().getContainers()) {

Review Comment:
   I was confused about the requirement.  It makes more sense to only apply it to flink main container.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "morhidi (via GitHub)" <gi...@apache.org>.
morhidi commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1156574710


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -366,6 +368,18 @@ private Optional<String> validateResources(String component, Resource resource)
             return Optional.of(component + " resource memory parse error: " + iae.getMessage());
         }
 
+        String storage = resource.getEphemeralStorage();

Review Comment:
   This will not catch misconfigured storage resources if memory is not defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] HuangZhenQiu commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "HuangZhenQiu (via GitHub)" <gi...@apache.org>.
HuangZhenQiu commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1156778567


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -409,15 +419,44 @@ private static void setPodTemplate(
             effectiveConfig.setString(
                     podConfigOption,
                     createTempFile(
-                            mergePodTemplates(
-                                    basicPod,
-                                    appendPod,
-                                    effectiveConfig.get(
-                                            KubernetesOperatorConfigOptions
-                                                    .POD_TEMPLATE_MERGE_BY_NAME))));
+                            applyResourceToPodTemplate(
+                                    mergePodTemplates(
+                                            basicPod,
+                                            appendPod,
+                                            effectiveConfig.get(
+                                                    KubernetesOperatorConfigOptions
+                                                            .POD_TEMPLATE_MERGE_BY_NAME)),
+                                    resource)));
         }
     }
 
+    private static Pod applyResourceToPodTemplate(Pod podTemplate, Resource resource) {
+        if (resource != null
+                && !StringUtils.isNullOrWhitespaceOnly(resource.getEphemeralStorage())
+                && podTemplate != null
+                && podTemplate.getSpec() != null) {

Review Comment:
   Added different scenarios in FlinkConfigBuilderTest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] HuangZhenQiu commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "HuangZhenQiu (via GitHub)" <gi...@apache.org>.
HuangZhenQiu commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1157758601


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -391,31 +399,87 @@ private static void setResource(
     }
 
     private static void setPodTemplate(
-            Pod basicPod, Pod appendPod, Configuration effectiveConfig, boolean isJM)
+            Pod basicPod,
+            Pod appendPod,
+            Resource resource,
+            Configuration effectiveConfig,
+            boolean isJM)
             throws IOException {
 
-        if (basicPod == null && appendPod == null) {
-            return;
-        }
-
         // Avoid to create temporary pod template files for JobManager and TaskManager if it is not
         // configured explicitly via .spec.JobManagerSpec.podTemplate or
         // .spec.TaskManagerSpec.podTemplate.
+        final ConfigOption<String> podConfigOption =
+                isJM
+                        ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
+                        : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
+
+        Pod basicPodWithResource = applyResourceToPodTemplate(basicPod, resource);

Review Comment:
   Yes, you are right. I think the best way to add the feature without breaking feature is that:
   
   Always merge first as long as not both basicPod and appendPod are null, then apply resource to pod.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1157015140


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -391,31 +399,87 @@ private static void setResource(
     }
 
     private static void setPodTemplate(
-            Pod basicPod, Pod appendPod, Configuration effectiveConfig, boolean isJM)
+            Pod basicPod,
+            Pod appendPod,
+            Resource resource,
+            Configuration effectiveConfig,
+            boolean isJM)
             throws IOException {
 
-        if (basicPod == null && appendPod == null) {
-            return;
-        }
-
         // Avoid to create temporary pod template files for JobManager and TaskManager if it is not
         // configured explicitly via .spec.JobManagerSpec.podTemplate or
         // .spec.TaskManagerSpec.podTemplate.
+        final ConfigOption<String> podConfigOption =
+                isJM
+                        ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
+                        : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
+
+        Pod basicPodWithResource = applyResourceToPodTemplate(basicPod, resource);

Review Comment:
   Honestly I don't think this logic will work like this due to how Pod merging works by default. I recommend testing the following example which is likely to be broken currently:
   
   1. Top level (common) pod template empty.
   2. TM/JM level pod Template define 2 containers: [init, main-container] , in this order
   3. Define ephemeral storage
   
   I think the result will not be what you expect.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora merged PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] HuangZhenQiu commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "HuangZhenQiu (via GitHub)" <gi...@apache.org>.
HuangZhenQiu commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1157873527


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -391,31 +399,87 @@ private static void setResource(
     }
 
     private static void setPodTemplate(
-            Pod basicPod, Pod appendPod, Configuration effectiveConfig, boolean isJM)
+            Pod basicPod,
+            Pod appendPod,
+            Resource resource,
+            Configuration effectiveConfig,
+            boolean isJM)
             throws IOException {
 
-        if (basicPod == null && appendPod == null) {
-            return;
-        }
-
         // Avoid to create temporary pod template files for JobManager and TaskManager if it is not
         // configured explicitly via .spec.JobManagerSpec.podTemplate or
         // .spec.TaskManagerSpec.podTemplate.
+        final ConfigOption<String> podConfigOption =
+                isJM
+                        ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
+                        : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
+
+        Pod basicPodWithResource = applyResourceToPodTemplate(basicPod, resource);
+
         if (appendPod != null) {
-            final ConfigOption<String> podConfigOption =
-                    isJM
-                            ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
-                            : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
-            effectiveConfig.setString(
-                    podConfigOption,
-                    createTempFile(
-                            mergePodTemplates(
-                                    basicPod,
-                                    appendPod,
-                                    effectiveConfig.get(
-                                            KubernetesOperatorConfigOptions
-                                                    .POD_TEMPLATE_MERGE_BY_NAME))));
+            Pod mergedPodTemplate =
+                    mergePodTemplates(
+                            basicPodWithResource,
+                            appendPod,
+                            effectiveConfig.get(
+                                    KubernetesOperatorConfigOptions.POD_TEMPLATE_MERGE_BY_NAME));
+            effectiveConfig.setString(podConfigOption, createTempFile(mergedPodTemplate));
+        } else {
+            effectiveConfig.setString(podConfigOption, createTempFile(basicPodWithResource));
+        }
+    }
+
+    private static Pod applyResourceToPodTemplate(Pod podTemplate, Resource resource) {
+        if (resource == null
+                || StringUtils.isNullOrWhitespaceOnly(resource.getEphemeralStorage())) {
+            return podTemplate;
         }
+
+        if (podTemplate == null || podTemplate.getSpec() == null) {
+            PodSpec spec = new PodSpec();
+            spec.getContainers()
+                    .add(
+                            decorateContainerWithEphemeralStorage(
+                                    new Container(), resource.getEphemeralStorage()));
+            Pod newPodTemplate = new Pod();
+            newPodTemplate.setSpec(spec);
+            return newPodTemplate;

Review Comment:
   Thanks for pointing it out. Added the testApplyResourceToPodTemplate function in FlinkConfigBuilderTest to cover it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1156577779


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -409,15 +419,44 @@ private static void setPodTemplate(
             effectiveConfig.setString(
                     podConfigOption,
                     createTempFile(
-                            mergePodTemplates(
-                                    basicPod,
-                                    appendPod,
-                                    effectiveConfig.get(
-                                            KubernetesOperatorConfigOptions
-                                                    .POD_TEMPLATE_MERGE_BY_NAME))));
+                            applyResourceToPodTemplate(
+                                    mergePodTemplates(
+                                            basicPod,
+                                            appendPod,
+                                            effectiveConfig.get(
+                                                    KubernetesOperatorConfigOptions
+                                                            .POD_TEMPLATE_MERGE_BY_NAME)),
+                                    resource)));
         }
     }
 
+    private static Pod applyResourceToPodTemplate(Pod podTemplate, Resource resource) {
+        if (resource != null
+                && !StringUtils.isNullOrWhitespaceOnly(resource.getEphemeralStorage())
+                && podTemplate != null
+                && podTemplate.getSpec() != null) {

Review Comment:
   This will ignore the ephemeralStorage setting if the user did not specify any podTemplate (or spec in the podTemplate), that is probably incorrect.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -409,15 +419,44 @@ private static void setPodTemplate(
             effectiveConfig.setString(
                     podConfigOption,
                     createTempFile(
-                            mergePodTemplates(
-                                    basicPod,
-                                    appendPod,
-                                    effectiveConfig.get(
-                                            KubernetesOperatorConfigOptions
-                                                    .POD_TEMPLATE_MERGE_BY_NAME))));
+                            applyResourceToPodTemplate(
+                                    mergePodTemplates(
+                                            basicPod,
+                                            appendPod,
+                                            effectiveConfig.get(
+                                                    KubernetesOperatorConfigOptions
+                                                            .POD_TEMPLATE_MERGE_BY_NAME)),
+                                    resource)));
         }
     }
 
+    private static Pod applyResourceToPodTemplate(Pod podTemplate, Resource resource) {
+        if (resource != null
+                && !StringUtils.isNullOrWhitespaceOnly(resource.getEphemeralStorage())
+                && podTemplate != null
+                && podTemplate.getSpec() != null) {
+            for (Container container : podTemplate.getSpec().getContainers()) {

Review Comment:
   Should this apply for all containers or only to the main jm/tm container? `Constants.MAIN_CONTAINER_NAME` 
   I think only to the main one.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -409,15 +419,44 @@ private static void setPodTemplate(
             effectiveConfig.setString(
                     podConfigOption,
                     createTempFile(
-                            mergePodTemplates(
-                                    basicPod,
-                                    appendPod,
-                                    effectiveConfig.get(
-                                            KubernetesOperatorConfigOptions
-                                                    .POD_TEMPLATE_MERGE_BY_NAME))));
+                            applyResourceToPodTemplate(
+                                    mergePodTemplates(
+                                            basicPod,
+                                            appendPod,
+                                            effectiveConfig.get(
+                                                    KubernetesOperatorConfigOptions
+                                                            .POD_TEMPLATE_MERGE_BY_NAME)),
+                                    resource)));
         }
     }
 
+    private static Pod applyResourceToPodTemplate(Pod podTemplate, Resource resource) {
+        if (resource != null
+                && !StringUtils.isNullOrWhitespaceOnly(resource.getEphemeralStorage())
+                && podTemplate != null
+                && podTemplate.getSpec() != null) {

Review Comment:
   We need some more test coverage for different configs:
    1. Ephemeral storage settings without any podtemplate 
    2. Ephemeral storage without tm/jm podTemplate 
    3. Ephemeral storage without spec in podTemplate



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -409,15 +419,44 @@ private static void setPodTemplate(
             effectiveConfig.setString(
                     podConfigOption,
                     createTempFile(
-                            mergePodTemplates(
-                                    basicPod,
-                                    appendPod,
-                                    effectiveConfig.get(
-                                            KubernetesOperatorConfigOptions
-                                                    .POD_TEMPLATE_MERGE_BY_NAME))));
+                            applyResourceToPodTemplate(
+                                    mergePodTemplates(

Review Comment:
   I think we should not nest these calls but do one after the other for readability



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] HuangZhenQiu commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "HuangZhenQiu (via GitHub)" <gi...@apache.org>.
HuangZhenQiu commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1157758601


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -391,31 +399,87 @@ private static void setResource(
     }
 
     private static void setPodTemplate(
-            Pod basicPod, Pod appendPod, Configuration effectiveConfig, boolean isJM)
+            Pod basicPod,
+            Pod appendPod,
+            Resource resource,
+            Configuration effectiveConfig,
+            boolean isJM)
             throws IOException {
 
-        if (basicPod == null && appendPod == null) {
-            return;
-        }
-
         // Avoid to create temporary pod template files for JobManager and TaskManager if it is not
         // configured explicitly via .spec.JobManagerSpec.podTemplate or
         // .spec.TaskManagerSpec.podTemplate.
+        final ConfigOption<String> podConfigOption =
+                isJM
+                        ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
+                        : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
+
+        Pod basicPodWithResource = applyResourceToPodTemplate(basicPod, resource);

Review Comment:
   Yes, you are right. I think the best way to add the feature without breaking existing supported functionality is that:
   
   Always merge first as long as not both basicPod and appendPod are null, then apply resource to pod.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] HuangZhenQiu commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "HuangZhenQiu (via GitHub)" <gi...@apache.org>.
HuangZhenQiu commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1156777913


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -366,6 +368,18 @@ private Optional<String> validateResources(String component, Resource resource)
             return Optional.of(component + " resource memory parse error: " + iae.getMessage());
         }
 
+        String storage = resource.getEphemeralStorage();

Review Comment:
   Got catch. Thanks!



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -409,15 +419,44 @@ private static void setPodTemplate(
             effectiveConfig.setString(
                     podConfigOption,
                     createTempFile(
-                            mergePodTemplates(
-                                    basicPod,
-                                    appendPod,
-                                    effectiveConfig.get(
-                                            KubernetesOperatorConfigOptions
-                                                    .POD_TEMPLATE_MERGE_BY_NAME))));
+                            applyResourceToPodTemplate(
+                                    mergePodTemplates(

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#issuecomment-1495587834

   > I'm wondering if it won't be cleaner to let flink manage the resources field as it is done for cpu, memory and external resources (see https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java#L168)
   > and having the operator only providing the ephemeral storage config inside the flink configuration?
   
   That would be nice but even if we add this to a newer Flink version, this logic is required to support this to current / older versions .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1157015140


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -391,31 +399,87 @@ private static void setResource(
     }
 
     private static void setPodTemplate(
-            Pod basicPod, Pod appendPod, Configuration effectiveConfig, boolean isJM)
+            Pod basicPod,
+            Pod appendPod,
+            Resource resource,
+            Configuration effectiveConfig,
+            boolean isJM)
             throws IOException {
 
-        if (basicPod == null && appendPod == null) {
-            return;
-        }
-
         // Avoid to create temporary pod template files for JobManager and TaskManager if it is not
         // configured explicitly via .spec.JobManagerSpec.podTemplate or
         // .spec.TaskManagerSpec.podTemplate.
+        final ConfigOption<String> podConfigOption =
+                isJM
+                        ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
+                        : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
+
+        Pod basicPodWithResource = applyResourceToPodTemplate(basicPod, resource);

Review Comment:
   Honestly I don't think this logic will work like this due to how Pod merging works by default. I recommend testing the following example which is likely to be broken currently:
   
   1. Top level (common) pod template empty.
   2. TM/JM level pod Template define 2 containers: [init, main-container] , in this order
   
   I think the result will not be what you expect.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #561: [FLINK-30609] Add ephemeral storage to CRD

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #561:
URL: https://github.com/apache/flink-kubernetes-operator/pull/561#discussion_r1157007742


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -391,31 +399,87 @@ private static void setResource(
     }
 
     private static void setPodTemplate(
-            Pod basicPod, Pod appendPod, Configuration effectiveConfig, boolean isJM)
+            Pod basicPod,
+            Pod appendPod,
+            Resource resource,
+            Configuration effectiveConfig,
+            boolean isJM)
             throws IOException {
 
-        if (basicPod == null && appendPod == null) {
-            return;
-        }
-
         // Avoid to create temporary pod template files for JobManager and TaskManager if it is not
         // configured explicitly via .spec.JobManagerSpec.podTemplate or
         // .spec.TaskManagerSpec.podTemplate.
+        final ConfigOption<String> podConfigOption =
+                isJM
+                        ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
+                        : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
+
+        Pod basicPodWithResource = applyResourceToPodTemplate(basicPod, resource);
+
         if (appendPod != null) {
-            final ConfigOption<String> podConfigOption =
-                    isJM
-                            ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
-                            : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
-            effectiveConfig.setString(
-                    podConfigOption,
-                    createTempFile(
-                            mergePodTemplates(
-                                    basicPod,
-                                    appendPod,
-                                    effectiveConfig.get(
-                                            KubernetesOperatorConfigOptions
-                                                    .POD_TEMPLATE_MERGE_BY_NAME))));
+            Pod mergedPodTemplate =
+                    mergePodTemplates(
+                            basicPodWithResource,
+                            appendPod,
+                            effectiveConfig.get(
+                                    KubernetesOperatorConfigOptions.POD_TEMPLATE_MERGE_BY_NAME));
+            effectiveConfig.setString(podConfigOption, createTempFile(mergedPodTemplate));
+        } else {
+            effectiveConfig.setString(podConfigOption, createTempFile(basicPodWithResource));
+        }
+    }
+
+    private static Pod applyResourceToPodTemplate(Pod podTemplate, Resource resource) {
+        if (resource == null
+                || StringUtils.isNullOrWhitespaceOnly(resource.getEphemeralStorage())) {
+            return podTemplate;
         }
+
+        if (podTemplate == null || podTemplate.getSpec() == null) {
+            PodSpec spec = new PodSpec();
+            spec.getContainers()
+                    .add(
+                            decorateContainerWithEphemeralStorage(
+                                    new Container(), resource.getEphemeralStorage()));
+            Pod newPodTemplate = new Pod();
+            newPodTemplate.setSpec(spec);
+            return newPodTemplate;

Review Comment:
   If only the spec is null, this logic overwrites the entire pod, that sounds incorrect. Imagine the user only set the metadata. Please add a test to guard against this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org