You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/07/08 09:24:58 UTC

[incubator-seatunnel] branch dev updated: [Core][Starter] Fix same source and sink register plugin set twice (#2151)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new eb179cf2f [Core][Starter] Fix same source and sink register plugin set twice (#2151)
eb179cf2f is described below

commit eb179cf2fb05e0c1c8d5848c89e901dfc2b7fb7e
Author: Hisoka <fa...@qq.com>
AuthorDate: Fri Jul 8 17:24:52 2022 +0800

    [Core][Starter] Fix same source and sink register plugin set twice (#2151)
---
 .../apache/seatunnel/core/flink/config/FlinkExecutionContext.java | 8 +++++---
 .../main/java/org/apache/seatunnel/core/spark/SparkStarter.java   | 4 +++-
 .../apache/seatunnel/core/spark/config/SparkExecutionContext.java | 8 +++++---
 .../core/starter/flink/execution/SinkExecuteProcessor.java        | 2 +-
 .../core/starter/flink/execution/SourceExecuteProcessor.java      | 6 ++++--
 .../core/starter/flink/execution/TransformExecuteProcessor.java   | 2 +-
 .../org/apache/seatunnel/core/starter/spark/SparkStarter.java     | 4 +++-
 .../core/starter/spark/execution/SinkExecuteProcessor.java        | 2 +-
 .../core/starter/spark/execution/SourceExecuteProcessor.java      | 6 ++++--
 .../core/starter/spark/execution/TransformExecuteProcessor.java   | 2 +-
 10 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
index 78fa9b71d..2acb83699 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
@@ -33,7 +33,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnvironment> {
@@ -47,12 +49,12 @@ public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnviron
         this.flinkSourcePluginDiscovery = new FlinkSourcePluginDiscovery();
         this.flinkTransformPluginDiscovery = new FlinkTransformPluginDiscovery();
         this.flinkSinkPluginDiscovery = new FlinkSinkPluginDiscovery();
-        List<URL> pluginJars = new ArrayList<>();
+        Set<URL> pluginJars = new HashSet<>();
         // since we didn't split the transform plugin jars, we just need to register the source/sink plugin jars
         pluginJars.addAll(flinkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SOURCE)));
         pluginJars.addAll(flinkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SINK)));
-        this.pluginJars = pluginJars;
-        this.getEnvironment().registerPlugin(pluginJars);
+        this.pluginJars = new ArrayList<>(pluginJars);
+        this.getEnvironment().registerPlugin(this.pluginJars);
     }
 
     @Override
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 0fd70b629..e8c9855f1 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -50,9 +50,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -213,7 +215,7 @@ public class SparkStarter implements Starter {
             return Collections.emptyList();
         }
         Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
-        List<URL> pluginJars = new ArrayList<>();
+        Set<URL> pluginJars = new HashSet<>();
         SparkSourcePluginDiscovery sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery();
         SparkSinkPluginDiscovery sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery();
         pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SOURCE)));
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
index 80fd61adc..07854b83c 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
@@ -33,7 +33,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class SparkExecutionContext extends AbstractExecutionContext<SparkEnvironment> {
@@ -47,11 +49,11 @@ public class SparkExecutionContext extends AbstractExecutionContext<SparkEnviron
         this.sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery();
         this.sparkTransformPluginDiscovery = new SparkTransformPluginDiscovery();
         this.sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery();
-        List<URL> pluginJars = new ArrayList<>();
+        Set<URL> pluginJars = new HashSet<>();
         pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SOURCE)));
         pluginJars.addAll(sparkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SINK)));
-        this.pluginJars = pluginJars;
-        this.getEnvironment().registerPlugin(pluginJars);
+        this.pluginJars = new ArrayList<>(pluginJars);
+        this.getEnvironment().registerPlugin(this.pluginJars);
     }
 
     @Override
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 04eee536b..ef412213e 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -62,7 +62,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
             seaTunnelSink.prepare(sinkConfig);
             seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
             return seaTunnelSink;
-        }).collect(Collectors.toList());
+        }).distinct().collect(Collectors.toList());
         flinkEnvironment.registerPlugin(pluginJars);
         return sinks;
     }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 6ea19fb9a..fa0cb65eb 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -44,7 +44,9 @@ import org.apache.flink.types.Row;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSource> {
 
@@ -101,7 +103,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
     protected List<SeaTunnelSource> initializePlugins(List<? extends Config> pluginConfigs) {
         SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
         List<SeaTunnelSource> sources = new ArrayList<>();
-        List<URL> jars = new ArrayList<>();
+        Set<URL> jars = new HashSet<>();
         for (Config sourceConfig : pluginConfigs) {
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(
                 ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
@@ -115,7 +117,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
             }
             sources.add(seaTunnelSource);
         }
-        flinkEnvironment.registerPlugin(jars);
+        flinkEnvironment.registerPlugin(new ArrayList<>(jars));
         return sources;
     }
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index c4c13b852..d077f6d3a 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -55,7 +55,7 @@ public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<Fl
                 pluginInstance.setConfig(transformConfig);
                 pluginInstance.prepare(flinkEnvironment);
                 return pluginInstance;
-            }).collect(Collectors.toList());
+            }).distinct().collect(Collectors.toList());
         flinkEnvironment.registerPlugin(pluginJars);
         return transforms;
     }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 6cc8ef1a8..c4cbce869 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -51,9 +51,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -225,7 +227,7 @@ public class SparkStarter implements Starter {
             return Collections.emptyList();
         }
         Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
-        List<URL> pluginJars = new ArrayList<>();
+        Set<URL> pluginJars = new HashSet<>();
         SparkSourcePluginDiscovery sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery();
         SparkSinkPluginDiscovery sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery();
         pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SOURCE)));
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index e932587ff..974d0fcf0 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -58,7 +58,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
             seaTunnelSink.prepare(sinkConfig);
             seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
             return seaTunnelSink;
-        }).collect(Collectors.toList());
+        }).distinct().collect(Collectors.toList());
         sparkEnvironment.registerPlugin(pluginJars);
         return sinks;
     }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index d436d74f0..f8f8a59b8 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -34,7 +34,9 @@ import org.apache.spark.sql.types.StructType;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSource<?, ?, ?>> {
 
@@ -65,7 +67,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
     protected List<SeaTunnelSource<?, ?, ?>> initializePlugins(List<? extends Config> pluginConfigs) {
         SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
         List<SeaTunnelSource<?, ?, ?>> sources = new ArrayList<>();
-        List<URL> jars = new ArrayList<>();
+        Set<URL> jars = new HashSet<>();
         for (Config sourceConfig : pluginConfigs) {
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(
                 ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
@@ -75,7 +77,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
             seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
             sources.add(seaTunnelSource);
         }
-        sparkEnvironment.registerPlugin(jars);
+        sparkEnvironment.registerPlugin(new ArrayList<>(jars));
         return sources;
     }
 }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 945259340..5668457b7 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -55,7 +55,7 @@ public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<Ba
                 pluginInstance.setConfig(transformConfig);
                 pluginInstance.prepare(sparkEnvironment);
                 return pluginInstance;
-            }).collect(Collectors.toList());
+            }).distinct().collect(Collectors.toList());
         sparkEnvironment.registerPlugin(pluginJars);
         return transforms;
     }