You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/05 20:03:43 UTC

[GitHub] [incubator-seatunnel] liugddx opened a new pull request, #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

liugddx opened a new pull request, #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982

   close #2975
   
   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   Now,Later versions of FLink do not allow direct set up Configuration,the Configuration is unmodifiable,and i think constructors are more stable,so i want to set the configuration via the constructor.
   
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#discussion_r1003041191


##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);
+    }
+
+    private Config parseConfig(Config config, String path, List<URL> jars) {

Review Comment:
   Should change name too, Can't know it want to do what if just read name.



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -50,15 +57,29 @@ public class FlinkExecution implements TaskExecution {
     private final PluginExecuteProcessor sourcePluginExecuteProcessor;
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
     private final PluginExecuteProcessor sinkPluginExecuteProcessor;
+    private final List<URL> jarPaths;
 
     public FlinkExecution(Config config) {
-        this.flinkEnvironment = new FlinkEnvironmentFactory(config).getEnvironment();
-        JobContext jobContext = new JobContext();
-        jobContext.setJobMode(flinkEnvironment.getJobMode());
+        try {
+            jarPaths = new ArrayList<>(Collections.singletonList(
+                new File(Common.appLibDir().resolve(FlinkStarter.APP_JAR_NAME).toString()).toURI().toURL()));
+        } catch (MalformedURLException e) {
+            throw new SeaTunnelException("load flink starter error.", e);
+        }
         registerPlugin();
-        this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.SOURCE));
-        this.transformPluginExecuteProcessor = new TransformExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.TRANSFORM));
-        this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.SINK));
+        JobContext jobContext = new JobContext();
+        jobContext.setJobMode(new FlinkEnvironmentFactory(config).getJobMode(config.getConfig("env")));
+
+        this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
+        this.transformPluginExecuteProcessor = new TransformExecuteProcessor(jarPaths, config.getConfigList(Constants.TRANSFORM), jobContext);
+        this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(jarPaths, config.getConfigList(Constants.SINK), jobContext);
+
+        this.flinkEnvironment = new FlinkEnvironmentFactory(this.registerPlugin(config, jarPaths)).getEnvironment();

Review Comment:
   Don't create `FlinkEnvironmentFactory` twice



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -50,15 +57,29 @@ public class FlinkExecution implements TaskExecution {
     private final PluginExecuteProcessor sourcePluginExecuteProcessor;
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
     private final PluginExecuteProcessor sinkPluginExecuteProcessor;
+    private final List<URL> jarPaths;
 
     public FlinkExecution(Config config) {
-        this.flinkEnvironment = new FlinkEnvironmentFactory(config).getEnvironment();
-        JobContext jobContext = new JobContext();
-        jobContext.setJobMode(flinkEnvironment.getJobMode());
+        try {
+            jarPaths = new ArrayList<>(Collections.singletonList(

Review Comment:
   Use `Set` to aovid repeated jar path.



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);
+    }
+
+    private Config parseConfig(Config config, String path, List<URL> jars) {
+
+        if (config.hasPath(path)) {
+            List<URL> paths = Arrays.stream(config.getString(path).split(";")).map(uri -> {

Review Comment:
   Use `Set` to aovid repeated jar path.



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {

Review Comment:
   Maybe should named `injectJarsToConfig`



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);

Review Comment:
   Change the doc to tell user the new way add jar. Also this function should support spark later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#discussion_r1003935852


##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);

Review Comment:
   But you support add jar use `env.pipeline.jars` in config



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#discussion_r1003935951


##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);

Review Comment:
   > Change the doc to tell user the new way add jar. Also this function should support spark later.
   Spark uses the command line,i don't think it needs changing.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
liugddx commented on PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#issuecomment-1287797189

   @CalvinKirs @EricJoy2048 @ashulin @Hisoka-X This pr is a subtask for Upgrade flink. For details in #2975


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#issuecomment-1292906135

   > There seems to be a problem, too &nbsp;
   > […](#)
   > ------------------&nbsp;Original&nbsp;------------------ From: "apache/incubator-seatunnel" ***@***.***&gt;; Date:&nbsp;Tue, Oct 25, 2022 08:54 PM ***@***.***&gt;; ***@***.******@***.***&gt;; Subject:&nbsp;Re: [apache/incubator-seatunnel] [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars (PR #2982) Retry CI after #3183 merged — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: ***@***.***&gt;
   
   We will fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X merged pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
Hisoka-X merged PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#discussion_r1003940231


##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);

Review Comment:
   > But you support add jar use `env.pipeline.jars` in config
   This is only used by flink,does spark need this way?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#issuecomment-1290508881

   Retry CI after #3183 merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
liugddx commented on PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#issuecomment-1291443300

   There seems to be a problem, too
   
   
   
   &nbsp;
   
   
   
   
   ------------------&nbsp;Original&nbsp;------------------
   From:                                                                                                                        "apache/incubator-seatunnel"                                                                                    ***@***.***&gt;;
   Date:&nbsp;Tue, Oct 25, 2022 08:54 PM
   ***@***.***&gt;;
   ***@***.******@***.***&gt;;
   Subject:&nbsp;Re: [apache/incubator-seatunnel] [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars (PR #2982)
   
   
   
   
   
    
   Retry CI after #3183 merged
    
   —
   Reply to this email directly, view it on GitHub, or unsubscribe.
   You are receiving this because you authored the thread.Message ID: ***@***.***&gt;


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#discussion_r1003940884


##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);

Review Comment:
   > But you support add jar use `env.pipeline.jars` in config
   
   I see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#discussion_r1003935259


##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);

Review Comment:
   > Change the doc to tell user the new way add jar. Also this function should support spark later.
   
   This change does not require a change in usage,it's just the way the jar is injected that has changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Posted by GitBox <gi...@apache.org>.
liugddx commented on PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#issuecomment-1289895427

   Done. @Hisoka-X 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org