You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/07/08 09:24:58 UTC
[incubator-seatunnel] branch dev updated: [Core][Starter] Fix same source and sink register plugin set twice (#2151)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new eb179cf2f [Core][Starter] Fix same source and sink register plugin set twice (#2151)
eb179cf2f is described below
commit eb179cf2fb05e0c1c8d5848c89e901dfc2b7fb7e
Author: Hisoka <fa...@qq.com>
AuthorDate: Fri Jul 8 17:24:52 2022 +0800
[Core][Starter] Fix same source and sink register plugin set twice (#2151)
---
.../apache/seatunnel/core/flink/config/FlinkExecutionContext.java | 8 +++++---
.../main/java/org/apache/seatunnel/core/spark/SparkStarter.java | 4 +++-
.../apache/seatunnel/core/spark/config/SparkExecutionContext.java | 8 +++++---
.../core/starter/flink/execution/SinkExecuteProcessor.java | 2 +-
.../core/starter/flink/execution/SourceExecuteProcessor.java | 6 ++++--
.../core/starter/flink/execution/TransformExecuteProcessor.java | 2 +-
.../org/apache/seatunnel/core/starter/spark/SparkStarter.java | 4 +++-
.../core/starter/spark/execution/SinkExecuteProcessor.java | 2 +-
.../core/starter/spark/execution/SourceExecuteProcessor.java | 6 ++++--
.../core/starter/spark/execution/TransformExecuteProcessor.java | 2 +-
10 files changed, 28 insertions(+), 16 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
index 78fa9b71d..2acb83699 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
@@ -33,7 +33,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.net.URL;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnvironment> {
@@ -47,12 +49,12 @@ public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnviron
this.flinkSourcePluginDiscovery = new FlinkSourcePluginDiscovery();
this.flinkTransformPluginDiscovery = new FlinkTransformPluginDiscovery();
this.flinkSinkPluginDiscovery = new FlinkSinkPluginDiscovery();
- List<URL> pluginJars = new ArrayList<>();
+ Set<URL> pluginJars = new HashSet<>();
// since we didn't split the transform plugin jars, we just need to register the source/sink plugin jars
pluginJars.addAll(flinkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SOURCE)));
pluginJars.addAll(flinkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SINK)));
- this.pluginJars = pluginJars;
- this.getEnvironment().registerPlugin(pluginJars);
+ this.pluginJars = new ArrayList<>(pluginJars);
+ this.getEnvironment().registerPlugin(this.pluginJars);
}
@Override
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 0fd70b629..e8c9855f1 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -50,9 +50,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -213,7 +215,7 @@ public class SparkStarter implements Starter {
return Collections.emptyList();
}
Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
- List<URL> pluginJars = new ArrayList<>();
+ Set<URL> pluginJars = new HashSet<>();
SparkSourcePluginDiscovery sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery();
SparkSinkPluginDiscovery sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery();
pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SOURCE)));
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
index 80fd61adc..07854b83c 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
@@ -33,7 +33,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.net.URL;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
public class SparkExecutionContext extends AbstractExecutionContext<SparkEnvironment> {
@@ -47,11 +49,11 @@ public class SparkExecutionContext extends AbstractExecutionContext<SparkEnviron
this.sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery();
this.sparkTransformPluginDiscovery = new SparkTransformPluginDiscovery();
this.sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery();
- List<URL> pluginJars = new ArrayList<>();
+ Set<URL> pluginJars = new HashSet<>();
pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SOURCE)));
pluginJars.addAll(sparkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SINK)));
- this.pluginJars = pluginJars;
- this.getEnvironment().registerPlugin(pluginJars);
+ this.pluginJars = new ArrayList<>(pluginJars);
+ this.getEnvironment().registerPlugin(this.pluginJars);
}
@Override
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 04eee536b..ef412213e 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -62,7 +62,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
return seaTunnelSink;
- }).collect(Collectors.toList());
+ }).distinct().collect(Collectors.toList());
flinkEnvironment.registerPlugin(pluginJars);
return sinks;
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 6ea19fb9a..fa0cb65eb 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -44,7 +44,9 @@ import org.apache.flink.types.Row;
import java.net.URL;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSource> {
@@ -101,7 +103,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
protected List<SeaTunnelSource> initializePlugins(List<? extends Config> pluginConfigs) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
List<SeaTunnelSource> sources = new ArrayList<>();
- List<URL> jars = new ArrayList<>();
+ Set<URL> jars = new HashSet<>();
for (Config sourceConfig : pluginConfigs) {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
@@ -115,7 +117,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
}
sources.add(seaTunnelSource);
}
- flinkEnvironment.registerPlugin(jars);
+ flinkEnvironment.registerPlugin(new ArrayList<>(jars));
return sources;
}
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index c4c13b852..d077f6d3a 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -55,7 +55,7 @@ public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<Fl
pluginInstance.setConfig(transformConfig);
pluginInstance.prepare(flinkEnvironment);
return pluginInstance;
- }).collect(Collectors.toList());
+ }).distinct().collect(Collectors.toList());
flinkEnvironment.registerPlugin(pluginJars);
return transforms;
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 6cc8ef1a8..c4cbce869 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -51,9 +51,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -225,7 +227,7 @@ public class SparkStarter implements Starter {
return Collections.emptyList();
}
Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
- List<URL> pluginJars = new ArrayList<>();
+ Set<URL> pluginJars = new HashSet<>();
SparkSourcePluginDiscovery sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery();
SparkSinkPluginDiscovery sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery();
pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SOURCE)));
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index e932587ff..974d0fcf0 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -58,7 +58,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
return seaTunnelSink;
- }).collect(Collectors.toList());
+ }).distinct().collect(Collectors.toList());
sparkEnvironment.registerPlugin(pluginJars);
return sinks;
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index d436d74f0..f8f8a59b8 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -34,7 +34,9 @@ import org.apache.spark.sql.types.StructType;
import java.net.URL;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSource<?, ?, ?>> {
@@ -65,7 +67,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
protected List<SeaTunnelSource<?, ?, ?>> initializePlugins(List<? extends Config> pluginConfigs) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
List<SeaTunnelSource<?, ?, ?>> sources = new ArrayList<>();
- List<URL> jars = new ArrayList<>();
+ Set<URL> jars = new HashSet<>();
for (Config sourceConfig : pluginConfigs) {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
@@ -75,7 +77,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
sources.add(seaTunnelSource);
}
- sparkEnvironment.registerPlugin(jars);
+ sparkEnvironment.registerPlugin(new ArrayList<>(jars));
return sources;
}
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 945259340..5668457b7 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -55,7 +55,7 @@ public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<Ba
pluginInstance.setConfig(transformConfig);
pluginInstance.prepare(sparkEnvironment);
return pluginInstance;
- }).collect(Collectors.toList());
+ }).distinct().collect(Collectors.toList());
sparkEnvironment.registerPlugin(pluginJars);
return transforms;
}