You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/24 08:42:24 UTC

[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2982: [Bug] [seatunnel-api-flink] Use the constructor instead of setting the pipeline.jars

Hisoka-X commented on code in PR #2982:
URL: https://github.com/apache/incubator-seatunnel/pull/2982#discussion_r1003041191


##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);
+    }
+
+    private Config parseConfig(Config config, String path, List<URL> jars) {

Review Comment:
   Should change name too, Can't know it want to do what if just read name.



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -50,15 +57,29 @@ public class FlinkExecution implements TaskExecution {
     private final PluginExecuteProcessor sourcePluginExecuteProcessor;
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
     private final PluginExecuteProcessor sinkPluginExecuteProcessor;
+    private final List<URL> jarPaths;
 
     public FlinkExecution(Config config) {
-        this.flinkEnvironment = new FlinkEnvironmentFactory(config).getEnvironment();
-        JobContext jobContext = new JobContext();
-        jobContext.setJobMode(flinkEnvironment.getJobMode());
+        try {
+            jarPaths = new ArrayList<>(Collections.singletonList(
+                new File(Common.appLibDir().resolve(FlinkStarter.APP_JAR_NAME).toString()).toURI().toURL()));
+        } catch (MalformedURLException e) {
+            throw new SeaTunnelException("load flink starter error.", e);
+        }
         registerPlugin();
-        this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.SOURCE));
-        this.transformPluginExecuteProcessor = new TransformExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.TRANSFORM));
-        this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.SINK));
+        JobContext jobContext = new JobContext();
+        jobContext.setJobMode(new FlinkEnvironmentFactory(config).getJobMode(config.getConfig("env")));
+
+        this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
+        this.transformPluginExecuteProcessor = new TransformExecuteProcessor(jarPaths, config.getConfigList(Constants.TRANSFORM), jobContext);
+        this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(jarPaths, config.getConfigList(Constants.SINK), jobContext);
+
+        this.flinkEnvironment = new FlinkEnvironmentFactory(this.registerPlugin(config, jarPaths)).getEnvironment();

Review Comment:
   Don't create `FlinkEnvironmentFactory` twice



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -50,15 +57,29 @@ public class FlinkExecution implements TaskExecution {
     private final PluginExecuteProcessor sourcePluginExecuteProcessor;
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
     private final PluginExecuteProcessor sinkPluginExecuteProcessor;
+    private final List<URL> jarPaths;
 
     public FlinkExecution(Config config) {
-        this.flinkEnvironment = new FlinkEnvironmentFactory(config).getEnvironment();
-        JobContext jobContext = new JobContext();
-        jobContext.setJobMode(flinkEnvironment.getJobMode());
+        try {
+            jarPaths = new ArrayList<>(Collections.singletonList(

Review Comment:
   Use `Set` to aovid repeated jar path.



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);
+    }
+
+    private Config parseConfig(Config config, String path, List<URL> jars) {
+
+        if (config.hasPath(path)) {
+            List<URL> paths = Arrays.stream(config.getString(path).split(";")).map(uri -> {

Review Comment:
   Use `Set` to aovid repeated jar path.



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {

Review Comment:
   Maybe should named `injectJarsToConfig`



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
 
         pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
 
-        flinkEnvironment.registerPlugin(pluginsJarDependencies);
+        jarPaths.addAll(pluginsJarDependencies);
+    }
+
+    private Config registerPlugin(Config config, List<URL> jars) {
+        config = this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
+        return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);

Review Comment:
   Change the doc to tell user the new way add jar. Also this function should support spark later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org