You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/16 13:15:30 UTC

[GitHub] [flink-kubernetes-operator] Aitozi opened a new pull request, #168: [FLINK-27161] Support fetch user jar from different source

Aitozi opened a new pull request, #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168

   This pr is meant to support fetch user jar from different source. 
   
   **The brief change log:**
   
   1. Introduced `ArtifactFetcher` interface, with two implementation to support `http` and `filesystem` based resource
   2. Copy the oss fs jar to the plugins dir as the default/example jar. Users can support other filesystem by copy other fs jars to the plugins.
   
   It is verified by unit test to verify the local filesystem and http. Manually verify the oss resource


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854910425


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -75,6 +75,12 @@ under the License.
             <version>${flink.version}</version>
         </dependency>
 
+        <dependency>

Review Comment:
   Since the dependency of `flink-clients` already include the `flink-core`, So I remove the duplicated declaration here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854741196


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -204,12 +209,14 @@ private JarRunResponseBody runJar(
         }
     }
 
-    private JarUploadResponseBody uploadJar(FlinkSessionJob sessionJob, Configuration conf)
+    private JarUploadResponseBody uploadJar(
+            FlinkSessionJob sessionJob, FlinkDeployment sessionCluster, Configuration conf)
             throws Exception {
-        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        String targetDir = artifactManager.generateJarDir(sessionCluster, sessionJob);
+        File jarFile = artifactManager.fetch(sessionJob.getSpec().getJob().getJarURI(), targetDir);

Review Comment:
   Let's keep the current behavior and do some improvements later if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r855748943


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+
+/** Manage the user artifacts. */
+public class ArtifactManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ArtifactManager.class);
+    private final File baseDir;
+
+    public ArtifactManager(FlinkOperatorConfiguration operatorConfiguration) {
+        this.baseDir = new File(operatorConfiguration.getArtifactsBaseDir());
+    }
+
+    private synchronized void createIfNotExists(File targetDir) {
+        if (!targetDir.exists()) {
+            try {
+                FileUtils.forceMkdirParent(targetDir);
+                LOG.info("Created dir: {}", targetDir);
+            } catch (Exception e) {
+                throw new FlinkRuntimeException(
+                        String.format("Failed to create the dir: %s", targetDir), e);
+            }
+        }
+    }
+
+    public File fetch(String jarURI, String targetDirStr) throws Exception {
+        File targetDir = new File(targetDirStr);
+        createIfNotExists(targetDir);
+        URI uri = new URI(jarURI);
+        if ("http".equals(uri.getScheme()) || "https".equals(uri.getScheme())) {
+            return HttpArtifactFetcher.INSTANCE.fetch(jarURI, targetDir);
+        } else {
+            return FileSystemBasedArtifactFetcher.INSTANCE.fetch(jarURI, targetDir);
+        }
+    }
+
+    public String generateJarDir(FlinkDeployment flinkApp, FlinkSessionJob sessionJob) {

Review Comment:
   Oh yes, I will simplify it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#issuecomment-1101953837

   Thanks for your explanation , I get your meaning. We could support session job first and then use the way of injecting init container as a bridge before the flink has ability to fetch the remote jars. 
   Besides, I remember there are already some tickets to discuss to enable flink can ship remote fs in k8s mode. I think we can continue these work after this have been done.
   
   [ttps://issues.apache.org/jira/browse/FLINK-20867](https://issues.apache.org/jira/browse/FLINK-20867)  Support ship files from local/remote path to k8s pod
   https://issues.apache.org/jira/browse/FLINK-26647  Can not add extra config files on native Kubernetes 
   https://issues.apache.org/jira/browse/FLINK-26897  Provide an additional way to add dependency files to k8s


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854745556


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java:
##########
@@ -97,4 +97,10 @@ public class OperatorConfigOptions {
                     .withDescription(
                             "The timeout for deployments to become ready/stable "
                                     + "before being rolled back if rollback is enabled.");
+
+    public static final ConfigOption<String> OPERATOR_USER_JAR_BASE_DIR =

Review Comment:
   OK, I have add the option to the `configuration.md`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#issuecomment-1100661721

   Please help take a look when you are free cc @gyfora @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#issuecomment-1101318826

   @wangyang0918 Do you mean to make flink fetch different source type aritifacts in the `KubernetesApplicationClusterEntrypoint` and `KubernetesTaskExecutorRunner` ?  I think it's feasible and it can indeed simplify the use of the application mode deployment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#issuecomment-1104679360

   @wangyang0918 Thanks for your review, I have addressed your comments, Please take a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854872589


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -75,6 +75,12 @@ under the License.
             <version>${flink.version}</version>
         </dependency>
 
+        <dependency>

Review Comment:
   What I mean is the transitive dependencies? Do they have already included in the NOTICE file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854662627


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.flink.core.fs.FileSystem;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/** Leverage the flink filesystem plugin to fetch the artifact. */
+public class FileSystemBasedArtifactFetcher implements ArtifactFetcher {
+
+    public static final Logger LOG = LoggerFactory.getLogger(FileSystemBasedArtifactFetcher.class);
+    public static final FileSystemBasedArtifactFetcher INSTANCE =
+            new FileSystemBasedArtifactFetcher();
+
+    @Override
+    public File fetch(String uri, File targetDir) throws Exception {
+        org.apache.flink.core.fs.Path source = new org.apache.flink.core.fs.Path(uri);
+        var start = System.currentTimeMillis();
+        FileSystem fileSystem = source.getFileSystem();
+        String fileName = source.getName();
+        File targetFile = new File(targetDir, fileName);
+        FileUtils.copyToFile(fileSystem.open(source), targetFile);

Review Comment:
   Nice catch, fixed



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URL;
+
+/** Download the jar from the http resource. */
+public class HttpArtifactFetcher implements ArtifactFetcher {
+
+    public static final Logger LOG = LoggerFactory.getLogger(HttpArtifactFetcher.class);
+    public static final HttpArtifactFetcher INSTANCE = new HttpArtifactFetcher();
+
+    @Override
+    public File fetch(String uri, File targetDir) throws Exception {
+        var start = System.currentTimeMillis();
+        URL url = new URL(uri);
+        String fileName = FilenameUtils.getName(url.getFile());
+        File targetFile = new File(targetDir, fileName);
+        FileUtils.copyToFile(new URL(uri).openStream(), targetFile);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854664914


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -270,6 +283,13 @@ under the License.
                             <version>${flink.version}</version>
                             <outputDirectory>${plugins.tmp.dir}/flink-metrics-statsd</outputDirectory>
                         </artifactItem>
+                        <artifactItem>

Review Comment:
   As discussed in the mail list, I think we should avoid bundle all the fs jars It will make the image bigger and not necessary. I intend to add here as an example for the user to follow up the guide, But I think I can remove it now and clarify how to extend it in the doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r855749032


##########
docs/content/docs/operations/configuration.md:
##########
@@ -62,3 +62,4 @@ To learn more about metrics and logging configuration please refer to the dedica
 | kubernetes.operator.observer.flink.client.timeout     |     10s    |  Duration    | The timeout for the observer to wait the flink rest client to return.            |
 | kubernetes.operator.reconciler.flink.cancel.job.timeout     |     1min    |  Duration    | The timeout for the reconciler to wait for flink to cancel job.            |
 | kubernetes.operator.reconciler.flink.cluster.shutdown.timeout     |     60s    |  Duration    | The timeout for the reconciler to wait for flink to shutdown cluster.           |
+| kubernetes.operator.user.artifacts.base.dir     |     60s    |  String |     The base dir to put the session job artifacts.           |

Review Comment:
   Thanks , will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854888469


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -270,6 +283,13 @@ under the License.
                             <version>${flink.version}</version>
                             <outputDirectory>${plugins.tmp.dir}/flink-metrics-statsd</outputDirectory>
                         </artifactItem>
+                        <artifactItem>

Review Comment:
   Yes, I marked on the description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#issuecomment-1106004593

   I have updated, please take a look again @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 merged pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
wangyang0918 merged PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r855741663


##########
docs/content/docs/operations/configuration.md:
##########
@@ -62,3 +62,4 @@ To learn more about metrics and logging configuration please refer to the dedica
 | kubernetes.operator.observer.flink.client.timeout     |     10s    |  Duration    | The timeout for the observer to wait the flink rest client to return.            |
 | kubernetes.operator.reconciler.flink.cancel.job.timeout     |     1min    |  Duration    | The timeout for the reconciler to wait for flink to cancel job.            |
 | kubernetes.operator.reconciler.flink.cluster.shutdown.timeout     |     60s    |  Duration    | The timeout for the reconciler to wait for flink to shutdown cluster.           |
+| kubernetes.operator.user.artifacts.base.dir     |     60s    |  String |     The base dir to put the session job artifacts.           |

Review Comment:
   The default value is not `60s`.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+
+/** Manage the user artifacts. */
+public class ArtifactManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ArtifactManager.class);
+    private final File baseDir;
+
+    public ArtifactManager(FlinkOperatorConfiguration operatorConfiguration) {
+        this.baseDir = new File(operatorConfiguration.getArtifactsBaseDir());
+    }
+
+    private synchronized void createIfNotExists(File targetDir) {
+        if (!targetDir.exists()) {
+            try {
+                FileUtils.forceMkdirParent(targetDir);
+                LOG.info("Created dir: {}", targetDir);
+            } catch (Exception e) {
+                throw new FlinkRuntimeException(
+                        String.format("Failed to create the dir: %s", targetDir), e);
+            }
+        }
+    }
+
+    public File fetch(String jarURI, String targetDirStr) throws Exception {
+        File targetDir = new File(targetDirStr);
+        createIfNotExists(targetDir);
+        URI uri = new URI(jarURI);
+        if ("http".equals(uri.getScheme()) || "https".equals(uri.getScheme())) {
+            return HttpArtifactFetcher.INSTANCE.fetch(jarURI, targetDir);
+        } else {
+            return FileSystemBasedArtifactFetcher.INSTANCE.fetch(jarURI, targetDir);
+        }
+    }
+
+    public String generateJarDir(FlinkDeployment flinkApp, FlinkSessionJob sessionJob) {

Review Comment:
   We need to enforce the session cluster and session jobs are in the same namespace. Right?
   
   Then maybe the `baseDir`  + `/` + `sessionJob.getMetadata().getNamespace()` + `/` + `sessionJob.getSpec().getClusterId()` + `/` + `sessionJob.getMetadata().getName()` is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854742144


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java:
##########
@@ -97,4 +97,10 @@ public class OperatorConfigOptions {
                     .withDescription(
                             "The timeout for deployments to become ready/stable "
                                     + "before being rolled back if rollback is enabled.");
+
+    public static final ConfigOption<String> OPERATOR_USER_JAR_BASE_DIR =

Review Comment:
   Not in this PR, but we could improve to generate the document automatically just like in Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854741545


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.config.OperatorConfigOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.nio.file.Path;
+
+/** Test for {@link ArtifactManager}. */
+public class ArtifactManagerTest {
+
+    @TempDir Path tempDir;
+    private ArtifactManager artifactManager;
+
+    @BeforeEach
+    public void setup() {
+        Configuration configuration = new Configuration();
+        configuration.setString(
+                OperatorConfigOptions.OPERATOR_USER_JAR_BASE_DIR,
+                tempDir.toAbsolutePath().toString());
+        artifactManager =
+                new ArtifactManager(FlinkOperatorConfiguration.fromConfiguration(configuration));
+    }
+
+    @Test
+    public void testGenerateJarDir() {
+        String baseDir =
+                artifactManager.generateJarDir(
+                        TestUtils.buildSessionCluster(), TestUtils.buildSessionJob());
+        String expected =
+                tempDir.toString()
+                        + File.separator
+                        + TestUtils.TEST_NAMESPACE
+                        + File.separator
+                        + TestUtils.TEST_DEPLOYMENT_NAME
+                        + File.separator
+                        + TestUtils.TEST_SESSION_JOB_NAME;
+        Assertions.assertEquals(expected, baseDir);
+    }
+
+    @Test
+    public void testFilesystemFetch() throws Exception {
+        var sourceFile = mockTheJarFile();
+        File file =
+                artifactManager.fetch(
+                        String.format("file://%s", sourceFile.getAbsolutePath()),
+                        tempDir.toString());
+        Assertions.assertTrue(file.exists());
+        Assertions.assertEquals(tempDir.toString(), file.getParentFile().toString());
+    }
+
+    @Test
+    public void testHttpFetch() throws Exception {
+        HttpServer httpServer = null;
+        try {
+            httpServer = HttpServer.create(new InetSocketAddress(1234), 0);

Review Comment:
   Fixed by start with retry



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#issuecomment-1101282826

   Before starting the review, I am wondering whether the application cluster could also benefit from this. Then users does not need to use the pod template to download the user jars if they are supported internally.
   
   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     namespace: default
     name: basic-example
   spec:
     image: flink:1.14
     flinkVersion: v1_14
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
     ... ...
     job:
       jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.3/flink-examples-streaming_2.12-1.14.3.jar
       parallelism: 2
       upgradeMode: stateless
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854663876


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -135,6 +141,13 @@ under the License.
             <version>${fabric8.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils-junit</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   Nice catch, removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854726992


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java:
##########
@@ -97,4 +97,10 @@ public class OperatorConfigOptions {
                     .withDescription(
                             "The timeout for deployments to become ready/stable "
                                     + "before being rolled back if rollback is enabled.");
+
+    public static final ConfigOption<String> OPERATOR_USER_JAR_BASE_DIR =

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854667241


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java:
##########
@@ -97,4 +97,10 @@ public class OperatorConfigOptions {
                     .withDescription(
                             "The timeout for deployments to become ready/stable "
                                     + "before being rolled back if rollback is enabled.");
+
+    public static final ConfigOption<String> OPERATOR_USER_JAR_BASE_DIR =

Review Comment:
   Does the option change have to be copy to `configuration.md` manually ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854742318


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -270,6 +283,13 @@ under the License.
                             <version>${flink.version}</version>
                             <outputDirectory>${plugins.tmp.dir}/flink-metrics-statsd</outputDirectory>
                         </artifactItem>
+                        <artifactItem>

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854663797


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -204,12 +209,14 @@ private JarRunResponseBody runJar(
         }
     }
 
-    private JarUploadResponseBody uploadJar(FlinkSessionJob sessionJob, Configuration conf)
+    private JarUploadResponseBody uploadJar(
+            FlinkSessionJob sessionJob, FlinkDeployment sessionCluster, Configuration conf)
             throws Exception {
-        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        String targetDir = artifactManager.generateJarDir(sessionCluster, sessionJob);
+        File jarFile = artifactManager.fetch(sessionJob.getSpec().getJob().getJarURI(), targetDir);

Review Comment:
   I also think about it, The good thing is that we can do no special thing for the local filesystem when `fetch` and `delete` (If no copy, we may can't delete after submit too). Do you think we need to handle the difference now ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854747142


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -270,6 +283,13 @@ under the License.
                             <version>${flink.version}</version>
                             <outputDirectory>${plugins.tmp.dir}/flink-metrics-statsd</outputDirectory>
                         </artifactItem>
+                        <artifactItem>

Review Comment:
   Get it. I will add it in the followup doc ticket



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854883516


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -270,6 +283,13 @@ under the License.
                             <version>${flink.version}</version>
                             <outputDirectory>${plugins.tmp.dir}/flink-metrics-statsd</outputDirectory>
                         </artifactItem>
+                        <artifactItem>

Review Comment:
   We just could add the document in FLINK-27270.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854888469


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -270,6 +283,13 @@ under the License.
                             <version>${flink.version}</version>
                             <outputDirectory>${plugins.tmp.dir}/flink-metrics-statsd</outputDirectory>
                         </artifactItem>
+                        <artifactItem>

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#issuecomment-1101319990

   Do you mean to make flink fetch different source type aritifacts in the `KubernetesApplicationClusterEntrypoint` and `KubernetesTaskExecutorRunner` ?  I think it's feasible and it can indeed simplify the use of the application mode deployment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854753397


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java:
##########
@@ -97,4 +97,10 @@ public class OperatorConfigOptions {
                     .withDescription(
                             "The timeout for deployments to become ready/stable "
                                     + "before being rolled back if rollback is enabled.");
+
+    public static final ConfigOption<String> OPERATOR_USER_JAR_BASE_DIR =

Review Comment:
   > Not in this PR, but we could improve to generate the document automatically just like in Flink.
   
   Created a ticket for this https://issues.apache.org/jira/browse/FLINK-27334



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r853929995


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URL;
+
+/** Download the jar from the http resource. */
+public class HttpArtifactFetcher implements ArtifactFetcher {
+
+    public static final Logger LOG = LoggerFactory.getLogger(HttpArtifactFetcher.class);
+    public static final HttpArtifactFetcher INSTANCE = new HttpArtifactFetcher();
+
+    @Override
+    public File fetch(String uri, File targetDir) throws Exception {
+        var start = System.currentTimeMillis();
+        URL url = new URL(uri);
+        String fileName = FilenameUtils.getName(url.getFile());
+        File targetFile = new File(targetDir, fileName);
+        FileUtils.copyToFile(new URL(uri).openStream(), targetFile);

Review Comment:
   Same as above.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.flink.core.fs.FileSystem;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/** Leverage the flink filesystem plugin to fetch the artifact. */
+public class FileSystemBasedArtifactFetcher implements ArtifactFetcher {
+
+    public static final Logger LOG = LoggerFactory.getLogger(FileSystemBasedArtifactFetcher.class);
+    public static final FileSystemBasedArtifactFetcher INSTANCE =
+            new FileSystemBasedArtifactFetcher();
+
+    @Override
+    public File fetch(String uri, File targetDir) throws Exception {
+        org.apache.flink.core.fs.Path source = new org.apache.flink.core.fs.Path(uri);
+        var start = System.currentTimeMillis();
+        FileSystem fileSystem = source.getFileSystem();
+        String fileName = source.getName();
+        File targetFile = new File(targetDir, fileName);
+        FileUtils.copyToFile(fileSystem.open(source), targetFile);

Review Comment:
   It seems that the opened stream is not closed correctly.



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.artifact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.config.OperatorConfigOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.nio.file.Path;
+
+/** Test for {@link ArtifactManager}. */
+public class ArtifactManagerTest {
+
+    @TempDir Path tempDir;
+    private ArtifactManager artifactManager;
+
+    @BeforeEach
+    public void setup() {
+        Configuration configuration = new Configuration();
+        configuration.setString(
+                OperatorConfigOptions.OPERATOR_USER_JAR_BASE_DIR,
+                tempDir.toAbsolutePath().toString());
+        artifactManager =
+                new ArtifactManager(FlinkOperatorConfiguration.fromConfiguration(configuration));
+    }
+
+    @Test
+    public void testGenerateJarDir() {
+        String baseDir =
+                artifactManager.generateJarDir(
+                        TestUtils.buildSessionCluster(), TestUtils.buildSessionJob());
+        String expected =
+                tempDir.toString()
+                        + File.separator
+                        + TestUtils.TEST_NAMESPACE
+                        + File.separator
+                        + TestUtils.TEST_DEPLOYMENT_NAME
+                        + File.separator
+                        + TestUtils.TEST_SESSION_JOB_NAME;
+        Assertions.assertEquals(expected, baseDir);
+    }
+
+    @Test
+    public void testFilesystemFetch() throws Exception {
+        var sourceFile = mockTheJarFile();
+        File file =
+                artifactManager.fetch(
+                        String.format("file://%s", sourceFile.getAbsolutePath()),
+                        tempDir.toString());
+        Assertions.assertTrue(file.exists());
+        Assertions.assertEquals(tempDir.toString(), file.getParentFile().toString());
+    }
+
+    @Test
+    public void testHttpFetch() throws Exception {
+        HttpServer httpServer = null;
+        try {
+            httpServer = HttpServer.create(new InetSocketAddress(1234), 0);

Review Comment:
   The test will be unstable when port `1234` is not available or running concurrently.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java:
##########
@@ -97,4 +97,10 @@ public class OperatorConfigOptions {
                     .withDescription(
                             "The timeout for deployments to become ready/stable "
                                     + "before being rolled back if rollback is enabled.");
+
+    public static final ConfigOption<String> OPERATOR_USER_JAR_BASE_DIR =

Review Comment:
   We might need to update the docs.



##########
flink-kubernetes-operator/pom.xml:
##########
@@ -135,6 +141,13 @@ under the License.
             <version>${fabric8.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils-junit</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   It seems that we do not need this dependency.



##########
flink-kubernetes-operator/pom.xml:
##########
@@ -75,6 +75,12 @@ under the License.
             <version>${flink.version}</version>
         </dependency>
 
+        <dependency>

Review Comment:
   We might need to update the notice file when introducing new dependency.



##########
flink-kubernetes-operator/pom.xml:
##########
@@ -270,6 +283,13 @@ under the License.
                             <version>${flink.version}</version>
                             <outputDirectory>${plugins.tmp.dir}/flink-metrics-statsd</outputDirectory>
                         </artifactItem>
+                        <artifactItem>

Review Comment:
   It is strange to only add the oss filesystem here. We need to bundle all the internal fs implementation or none.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -204,12 +209,14 @@ private JarRunResponseBody runJar(
         }
     }
 
-    private JarUploadResponseBody uploadJar(FlinkSessionJob sessionJob, Configuration conf)
+    private JarUploadResponseBody uploadJar(
+            FlinkSessionJob sessionJob, FlinkDeployment sessionCluster, Configuration conf)
             throws Exception {
-        Path path = jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        String targetDir = artifactManager.generateJarDir(sessionCluster, sessionJob);
+        File jarFile = artifactManager.fetch(sessionJob.getSpec().getJob().getJarURI(), targetDir);

Review Comment:
   If the user specified jar is a local file `file:///tmp/a.jar`, do we still need to copy it to `/opt/flink/artifacts`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#issuecomment-1101934289

   @Aitozi It will be great and makes things easier if the upstream Flink could support remote user jars directly. Benefit from YARN distributed cache, the YARN application mode have already support HDFS user jars. But this is not the top priority since we could get this done in the flink-kubernetes-operator project first.
   
   Given that users specify a remote user jar via `.spec.job.jarURI`, then the operator could inject an init-container to download the jars automatically.
   
   Let's narrow down the scope of FLINK-27161 to only support fetching remote user jars for session job. And create a follow-up ticket to support remote user jars for application cluster. I will start to review this PR today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854662879


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java:
##########
@@ -97,4 +97,10 @@ public class OperatorConfigOptions {
                     .withDescription(
                             "The timeout for deployments to become ready/stable "
                                     + "before being rolled back if rollback is enabled.");
+
+    public static final ConfigOption<String> OPERATOR_USER_JAR_BASE_DIR =

Review Comment:
   Yes, But the CI seems not find it 😄



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854745708


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -270,6 +283,13 @@ under the License.
                             <version>${flink.version}</version>
                             <outputDirectory>${plugins.tmp.dir}/flink-metrics-statsd</outputDirectory>
                         </artifactItem>
+                        <artifactItem>

Review Comment:
   Yes. We could remove it now and document it with `kustomize`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #168: [FLINK-27161] Support fetch user jar from different source

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #168:
URL: https://github.com/apache/flink-kubernetes-operator/pull/168#discussion_r854745008


##########
flink-kubernetes-operator/pom.xml:
##########
@@ -75,6 +75,12 @@ under the License.
             <version>${flink.version}</version>
         </dependency>
 
+        <dependency>

Review Comment:
   The flink-core seems already included. 
   
   ```
   - org.apache.flink:flink-core:1.14.4
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org