You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/11 02:50:47 UTC

[GitHub] [flink-kubernetes-operator] FuyaoLi2017 opened a new pull request, #202: [FLINK-27483]Add session job config field

FuyaoLi2017 opened a new pull request, #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202

   Add session job configuration field. Allow user to add custom HTTP header if they use HTTP based Jar fetcher for session jobs.
   
   This is not tested. Might need to refactor code during review. Currently there is no unit test for this (Need to design and implement).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869905741


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.flink.kubernetes.operator.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/** List supported session job specific configurations */
+public enum KubernetesOperatorSessionJobConfigOptions {
+
+    SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header");

Review Comment:
   I am not suggesting to change the logic, just the location of the constant you defined to not create unnecessary new classes and spread the options too much :)
   
   The operator should always use the config defined in the resource (session job/deployment) no difference. We usually apply this logic on top of the default config defined in the operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r873198656


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -141,4 +142,11 @@ public class KubernetesOperatorConfigOptions {
                     .noDefaultValue()
                     .withDescription(
                             "Whether to enable recovery of missing/deleted jobmanager deployments. False by default for Flink 1.14, true for newer Flink version.");
+
+    public static final ConfigOption<Map<String, String>> JAR_ARTIFACT_HTTP_HEADER =
+            ConfigOptions.key("kubernetes.operator.user.artifacts.http.header")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom HTTP header for a Flink job. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.");

Review Comment:
   Could we change this to: 
   ```
   Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts.  Expected format: headerKey1:headerValue1,headerKey2:headerValue2.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872488147


##########
docs/content/docs/operations/configuration.md:
##########
@@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica
 ## Operator Configuration Reference
 
 {{< generated/kubernetes_operator_config_configuration >}}
+
+## Job Specific Configuration Reference
+Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`.
+
+- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource.
+- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster.
+You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. 
+The session job level configuration will override the parent session cluster's Flink configuration.

Review Comment:
   If we can enumerate the possible configuration options and keys we can simply add a method to the validator that checks that the user doesn't try to set something that won't take effect



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872700622


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -110,5 +110,11 @@
             <td>String</td>
             <td>The base dir to put the session job artifacts.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.user.artifacts.http.header</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Map</td>
+            <td>Custom HTTP header for a Flink job. If configured in cluster level, headers will be applied to all jobs within the cluster. This field can also be configured under spec.job.flinkConfiguration for a specific session job within a session cluster. If configured at session job level, it will override the cluster level configuration. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.</td>

Review Comment:
   This is a generated html page. I guess we don't need to touch it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872636486


##########
docs/content/docs/operations/configuration.md:
##########
@@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica
 ## Operator Configuration Reference
 
 {{< generated/kubernetes_operator_config_configuration >}}
+
+## Job Specific Configuration Reference
+Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`.

Review Comment:
   I think this should correct. If you take a look at the CRD, this property will be sitting in `spec.job.flinkConfiguration` from the root of the CRD.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872701915


##########
docs/content/docs/operations/configuration.md:
##########
@@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica
 ## Operator Configuration Reference
 
 {{< generated/kubernetes_operator_config_configuration >}}
+
+## Job Specific Configuration Reference
+Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`.
+
+- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource.
+- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster.
+You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. 
+The session job level configuration will override the parent session cluster's Flink configuration.

Review Comment:
   Added related docs and validator code changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872721505


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java:
##########
@@ -83,6 +84,13 @@ public void reconcile(FlinkSessionJob flinkSessionJob, Context context) throws E
 
         Configuration deployedConfig = configManager.getObserveConfig(flinkDepOptional.get());
 
+        // merge session job specific config
+        Map<String, String> sessionJobFlinkConfiguration =

Review Comment:
   I created a `getSessionJobObserveConfig` method in `FlinkConfigManager` and replace this one and a few other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872873594


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) {
         return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment));
     }
 
+    public Configuration getSessionJobObserveConfig(
+            FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) {
+        Configuration sessionJobConfig = getObserveConfig(deployment);
+
+        // merge session job specific config
+        Map<String, String> sessionJobFlinkConfiguration =
+                flinkSessionJob.getSpec().getFlinkConfiguration();
+        if (sessionJobFlinkConfiguration != null && !sessionJobFlinkConfiguration.isEmpty()) {
+            sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString);

Review Comment:
   I guess the logic would be more straight forward if this config in sessionJob directly overrides the parent config. 
   
   If we do the merging work. Then a user will need to check two CRs instead of one to determine what are the actual headers that is applied during jar fetching during debugging. It could be a little bit confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r873206110


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -141,4 +142,11 @@ public class KubernetesOperatorConfigOptions {
                     .noDefaultValue()
                     .withDescription(
                             "Whether to enable recovery of missing/deleted jobmanager deployments. False by default for Flink 1.14, true for newer Flink version.");
+
+    public static final ConfigOption<Map<String, String>> JAR_ARTIFACT_HTTP_HEADER =
+            ConfigOptions.key("kubernetes.operator.user.artifacts.http.header")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom HTTP header for a Flink job. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.");

Review Comment:
   Sure. I just pushed the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#issuecomment-1125653576

   In the CI pipeline, it shows some unit test failures. But I can pass them in my local. I guess it might be some cascading error caused by documentation? I didn't rebuild the docs eariler since I am not familiar with the procedure.
   ```
   mvn clean install -DskipTests -Pgenerate-docs
   ```
   I ran the command above, generated some docs and pushed again...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872863534


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java:
##########
@@ -33,7 +34,8 @@ public class FileSystemBasedArtifactFetcher implements ArtifactFetcher {
             new FileSystemBasedArtifactFetcher();
 
     @Override
-    public File fetch(String uri, File targetDir) throws Exception {
+    public File fetch(String uri, Configuration flinkConfiguration, File targetDir)

Review Comment:
   No, it is not used. I need to change this line since this class overrides the `artifactFetcher` interface. In the future, we might need this configuration? Not sure. Anyways, it doesn't hurt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869878137


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.flink.kubernetes.operator.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/** List supported session job specific configurations */
+public enum KubernetesOperatorSessionJobConfigOptions {
+
+    SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header");

Review Comment:
   I think this all can be simply part of the `KuberenetesOperatorConfigOptions` no need for a new class



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkSessionJobSpec.java:
##########
@@ -38,4 +40,7 @@ public class FlinkSessionJobSpec extends AbstractFlinkSpec {
 
     /** The name of the target session cluster deployment. */
     private String deploymentName;
+
+    /** Session job specific configuration */
+    private Map<String, String> sessionJobFlinkConfiguration;

Review Comment:
   Instead of specificng a new config here, lets move the `flinkConfiguration` field of the `FlinkDeploymentSpec` up to `AbstractFlinkSpec` that way it will be inherited



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869897712


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.flink.kubernetes.operator.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/** List supported session job specific configurations */
+public enum KubernetesOperatorSessionJobConfigOptions {
+
+    SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header");

Review Comment:
   Hi @gyfora, I was thinking if we put this configuration at the `KuberenetesOperatorConfigOptions`, Does that imply you intend to put this header config field as a candidate in `flink-conf.yaml`? In that case, we are assuming the headers are the same for all session jobs within the cluster? I think maybe we can allow users to config different headers for different session jobs within one session cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869903829


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.flink.kubernetes.operator.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/** List supported session job specific configurations */
+public enum KubernetesOperatorSessionJobConfigOptions {
+
+    SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header");

Review Comment:
   Hi @gyfora If not, you also intend to keep this new `flinkConfiguration` as job specific. How do we control the behavior in the session cluster since it could have multiple different jobs?
   Taking a step back, we can assume the headers are all the same for the same cluster, that will make things simplier, but might NOT be very flexible.
   cc @wangyang0918 
   See Yang's comment.
   https://issues.apache.org/jira/browse/FLINK-27483?focusedCommentId=17532137&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17532137



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#issuecomment-1126414287

   @FuyaoLi2017 no worries, I have went through the code and added comments. Mostly minor stuff :)
   
   No need to worry about force pushing, this is your branch, it's necessary to force push PR branches to keep them clean.
   I also generally squash them together into 1-2  commits


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#issuecomment-1126402997

   Hi @Aitozi @gyfora , thanks for reviewing this. I have updated the code based on your suggestions. 
   
   1. Update typos for `spec.job.flinkConfiguration`.
   2. Update documentation about the session job flink configuration and provided allowed configurations (only the header config at the moment).
   3. Add validator logic in `DefaultValidator`. Also refactored existing `validateFlinkConfig` method name to `validateFlinkDeploymentConfig` to avoid confusion.
   4. Move configuration merge logic into `FlinkConfigManager`. Add a `getSessionJobObserveConfig` method, refactor a few places to call this method instead of `getObserveConfig`.
   
   There were some merge conflicts and rebased the main branch. I ran into some errors and introduced some unexpected changes, as a result, I force pushed the code to make it clean. Sorry for making the commit history kind of messy..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872873594


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) {
         return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment));
     }
 
+    public Configuration getSessionJobObserveConfig(
+            FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) {
+        Configuration sessionJobConfig = getObserveConfig(deployment);
+
+        // merge session job specific config
+        Map<String, String> sessionJobFlinkConfiguration =
+                flinkSessionJob.getSpec().getFlinkConfiguration();
+        if (sessionJobFlinkConfiguration != null && !sessionJobFlinkConfiguration.isEmpty()) {
+            sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString);

Review Comment:
   I guess the logic would be more straight forward if this config in sessionJob directly overrides the parent config. 
   
   If we do the merging work. Then a user will need to check two CRs instead of one to determine what are the actual headers that is applied during jar fetching during debugging. It could a little bit confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#issuecomment-1126082667

   Thank you @FuyaoLi2017 I will try to review this latest on monday as I will be travelling during the weekend.
   
   @Aitozi if you have some spare capacity I would appreciate a review but we can also do this on monday :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872916175


##########
docs/content/docs/operations/configuration.md:
##########
@@ -53,3 +53,12 @@ To learn more about metrics and logging configuration please refer to the dedica
 ## Operator Configuration Reference
 
 {{< generated/kubernetes_operator_config_configuration >}}
+
+## Job Specific Configuration Reference
+Job specific configuration can be configured under `spec.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`.
+
+- For application clusters, `spec.flinkConfiguration` will be located in `FlinkDeployment` CustomResource.
+- For session clusters, configuring `spec.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster.
+  - You can configure some additional job specific supplemental configuration through `spec.flinkConfiguration` in `FlinkSessionJob` CustomResource. 
+  Those session job level configurations will override the parent session cluster's Flink configuration. Please note only the following configurations are considered to be valid configurations.
+    - `kubernetes.operator.user.artifacts.http.header`

Review Comment:
   I think it's much clear now 👍🏻. 
   Some other thoughts come to my mind that, Maybe we could extend the `ConfigOption` to mark the config belong to which component (flink/operator). It's out of this PR, we could do it later if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872744006


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -141,4 +142,15 @@ public class KubernetesOperatorConfigOptions {
                     .noDefaultValue()
                     .withDescription(
                             "Whether to enable recovery of missing/deleted jobmanager deployments. False by default for Flink 1.14, true for newer Flink version.");
+
+    public static final ConfigOption<Map<String, String>> JAR_ARTIFACT_HTTP_HEADER =
+            ConfigOptions.key("kubernetes.operator.user.artifacts.http.header")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom HTTP header for a Flink job. If configured in cluster level, headers will be applied to all jobs within"

Review Comment:
   I think the description should detail what the config is good for. Does it affect artifact downloading?
   
   It doesn't need to explain how configurations work, we have general docs for that. So I would completely remove this part:
   ```
   If configured in cluster level, headers will be applied to all jobs within the cluster. This field can also be configured under spec.job.flinkConfiguration for a specific session job within a session cluster. If configured at session job level, it will override the cluster level configuration.
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) {
         return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment));
     }
 
+    public Configuration getSessionJobObserveConfig(

Review Comment:
   I think it's a bit confusing to call it `getSessionJobObserveConfig`. I would prefer to call it simply `getSessionJobConfig`



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) {
         return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment));
     }
 
+    public Configuration getSessionJobObserveConfig(
+            FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) {
+        Configuration sessionJobConfig = getObserveConfig(deployment);
+
+        // merge session job specific config
+        Map<String, String> sessionJobFlinkConfiguration =
+                flinkSessionJob.getSpec().getFlinkConfiguration();
+        if (sessionJobFlinkConfiguration != null && !sessionJobFlinkConfiguration.isEmpty()) {

Review Comment:
   The `!sessionJobFlinkConfiguration.isEmpty()` check is not necessary



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java:
##########
@@ -41,6 +42,7 @@ public class FlinkOperatorConfiguration {
     Duration flinkCancelJobTimeout;
     Duration flinkShutdownClusterTimeout;
     String artifactsBaseDir;
+    Map<String, String> artifactHttpHeader;

Review Comment:
   We shouldn't add this extra field here, it's not an operator configuration and it's also not used anywhere



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java:
##########
@@ -33,7 +34,8 @@ public class FileSystemBasedArtifactFetcher implements ArtifactFetcher {
             new FileSystemBasedArtifactFetcher();
 
     @Override
-    public File fetch(String uri, File targetDir) throws Exception {
+    public File fetch(String uri, Configuration flinkConfiguration, File targetDir)

Review Comment:
   Why did you add the config field here? Is it used?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -404,4 +409,21 @@ private Optional<String> validateServiceAccount(String serviceAccount) {
         }
         return Optional.empty();
     }
+
+    private Optional<String> validateFlinkSessionJobConfig(
+            Map<String, String> flinkSessionJobConfig) {
+        if (flinkSessionJobConfig == null) {
+            return Optional.empty();
+        }
+
+        for (String key : flinkSessionJobConfig.keySet()) {
+            if (!ALLOWED_FLINK_SESSION_JOB_CONF_KEYS.contains(key)) {

Review Comment:
   Could we please add a simple test for this into the validator test?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java:
##########
@@ -32,12 +37,26 @@ public class HttpArtifactFetcher implements ArtifactFetcher {
     public static final HttpArtifactFetcher INSTANCE = new HttpArtifactFetcher();
 
     @Override
-    public File fetch(String uri, File targetDir) throws Exception {
+    public File fetch(String uri, Configuration flinkConfiguration, File targetDir)
+            throws Exception {
         var start = System.currentTimeMillis();
         URL url = new URL(uri);
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+
+        // merged session job level header and cluster level header, session job level header take
+        // precedence.
+        Map<String, String> headers =
+                flinkConfiguration.get(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER);
+
+        if (headers != null && headers.size() > 0) {

Review Comment:
   size check not necessary



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) {
         return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment));
     }
 
+    public Configuration getSessionJobObserveConfig(
+            FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) {
+        Configuration sessionJobConfig = getObserveConfig(deployment);
+
+        // merge session job specific config
+        Map<String, String> sessionJobFlinkConfiguration =
+                flinkSessionJob.getSpec().getFlinkConfiguration();
+        if (sessionJobFlinkConfiguration != null && !sessionJobFlinkConfiguration.isEmpty()) {
+            sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString);

Review Comment:
   I am wondering for this particular case, would it make sense to merge the HTTP headers instead of overwriting them if the base config also defined it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872450640


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java:
##########
@@ -32,12 +37,24 @@ public class HttpArtifactFetcher implements ArtifactFetcher {
     public static final HttpArtifactFetcher INSTANCE = new HttpArtifactFetcher();
 
     @Override
-    public File fetch(String uri, File targetDir) throws Exception {
+    public File fetch(String uri, Configuration flinkConfiguration, File targetDir)
+            throws Exception {
         var start = System.currentTimeMillis();
         URL url = new URL(uri);
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+
+        Map<String, String> clusterLevelHeader =

Review Comment:
   we do not know this header is cluster level or job level here, what about naming it `headers`  directly ?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java:
##########
@@ -40,4 +42,7 @@ public abstract class AbstractFlinkSpec {
      * restart, change the number to anything other than the current value.
      */
     private Long restartNonce;
+
+    /** Flink configuration overrides for the Flink deployment. */

Review Comment:
   nit: the comment should be update: overrides for the Flink deployment or session job.



##########
docs/content/docs/operations/configuration.md:
##########
@@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica
 ## Operator Configuration Reference
 
 {{< generated/kubernetes_operator_config_configuration >}}
+
+## Job Specific Configuration Reference
+Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`.
+
+- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource.
+- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster.
+You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. 
+The session job level configuration will override the parent session cluster's Flink configuration.

Review Comment:
   I think we should let user know that the cluster level's flink config will not overridden by the session job's flinkConfiguration actually (I mean the session cluster's config). Personally, I think what sessionjob`spec.flinkConfiguration` defined, is not the **flink** Configuration, just some custom configs. 



##########
docs/content/docs/operations/configuration.md:
##########
@@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica
 ## Operator Configuration Reference
 
 {{< generated/kubernetes_operator_config_configuration >}}
+
+## Job Specific Configuration Reference
+Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`.
+
+- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource.

Review Comment:
   ditto



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java:
##########
@@ -83,6 +84,13 @@ public void reconcile(FlinkSessionJob flinkSessionJob, Context context) throws E
 
         Configuration deployedConfig = configManager.getObserveConfig(flinkDepOptional.get());
 
+        // merge session job specific config
+        Map<String, String> sessionJobFlinkConfiguration =

Review Comment:
   nit: maybe we could move this to the `FlinkConfigManager` 



##########
docs/content/docs/operations/configuration.md:
##########
@@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica
 ## Operator Configuration Reference
 
 {{< generated/kubernetes_operator_config_configuration >}}
+
+## Job Specific Configuration Reference
+Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`.
+
+- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource.
+- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster.
+You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. 

Review Comment:
   A newline here 



##########
docs/content/docs/operations/configuration.md:
##########
@@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica
 ## Operator Configuration Reference
 
 {{< generated/kubernetes_operator_config_configuration >}}
+
+## Job Specific Configuration Reference
+Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`.

Review Comment:
   do you mean `spec.flinkConfiguration` ?



##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -110,5 +110,11 @@
             <td>String</td>
             <td>The base dir to put the session job artifacts.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.user.artifacts.http.header</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Map</td>
+            <td>Custom HTTP header for a Flink job. If configured in cluster level, headers will be applied to all jobs within the cluster. This field can also be configured under spec.job.flinkConfiguration for a specific session job within a session cluster. If configured at session job level, it will override the cluster level configuration. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.</td>

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872722982


##########
docs/content/docs/operations/configuration.md:
##########
@@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica
 ## Operator Configuration Reference
 
 {{< generated/kubernetes_operator_config_configuration >}}
+
+## Job Specific Configuration Reference
+Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`.
+
+- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource.
+- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster.
+You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. 

Review Comment:
   I added a new line at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869906352


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.flink.kubernetes.operator.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/** List supported session job specific configurations */
+public enum KubernetesOperatorSessionJobConfigOptions {
+
+    SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header");

Review Comment:
   But I would allow users to put the header conf in the default Flink conf yaml to have some default value applied in case the session job doesn’t want to ovverride it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r870048652


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.flink.kubernetes.operator.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/** List supported session job specific configurations */
+public enum KubernetesOperatorSessionJobConfigOptions {
+
+    SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header");

Review Comment:
   We just need to document clearly which configuration options could take effect in `FlinkSessionJob.spec. flinkConfiguration`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] FuyaoLi2017 commented on pull request #202: [FLINK-27483]Add session job config field

Posted by GitBox <gi...@apache.org>.
FuyaoLi2017 commented on PR #202:
URL: https://github.com/apache/flink-kubernetes-operator/pull/202#issuecomment-1125524916

   Hi @gyfora , I noticed this error.
   ```
   Error:  Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check (default) on project flink-kubernetes-operator-parent: Too many files with unapproved license: 1 See RAT report in: /home/runner/work/flink-kubernetes-operator/flink-kubernetes-operator/target/rat.txt -> [Help 1]
   ```
   I don't know how to fix this. I can build the project successfully on my local laptop.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org