You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/07/05 11:58:23 UTC
[incubator-seatunnel] branch dev updated: [Core][Plugin] Fix same plugin can't create twice error. (#2132)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3dd7dc0e4 [Core][Plugin] Fix same plugin can't create twice error. (#2132)
3dd7dc0e4 is described below
commit 3dd7dc0e4cdf77bb5a039d3848365561cfac0f92
Author: Hisoka <fa...@qq.com>
AuthorDate: Tue Jul 5 19:58:17 2022 +0800
[Core][Plugin] Fix same plugin can't create twice error. (#2132)
* [Core][Plugin] fix same plugin can't create twice error.
* [Core][Plugin] change create plugin instance method name to createPluginInstance.
---
.../core/flink/config/FlinkExecutionContext.java | 6 +-
.../core/spark/config/SparkExecutionContext.java | 6 +-
.../flink/execution/SinkExecuteProcessor.java | 2 +-
.../flink/execution/SourceExecuteProcessor.java | 2 +-
.../flink/execution/TransformExecuteProcessor.java | 2 +-
.../spark/execution/SinkExecuteProcessor.java | 2 +-
.../spark/execution/SourceExecuteProcessor.java | 2 +-
.../spark/execution/TransformExecuteProcessor.java | 2 +-
.../plugin/discovery/AbstractPluginDiscovery.java | 71 +++++++++-------------
.../plugin/discovery/PluginDiscovery.java | 2 +-
10 files changed, 43 insertions(+), 54 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
index 773feb870..78fa9b71d 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
@@ -63,7 +63,7 @@ public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnviron
return configList.stream()
.map(pluginConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
- BaseSource<FlinkEnvironment> pluginInstance = flinkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+ BaseSource<FlinkEnvironment> pluginInstance = flinkSourcePluginDiscovery.createPluginInstance(pluginIdentifier);
pluginInstance.setConfig(pluginConfig);
return pluginInstance;
}).collect(Collectors.toList());
@@ -77,7 +77,7 @@ public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnviron
return configList.stream()
.map(pluginConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
- BaseTransform<FlinkEnvironment> pluginInstance = flinkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
+ BaseTransform<FlinkEnvironment> pluginInstance = flinkTransformPluginDiscovery.createPluginInstance(pluginIdentifier);
pluginInstance.setConfig(pluginConfig);
return pluginInstance;
}).collect(Collectors.toList());
@@ -91,7 +91,7 @@ public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnviron
return configList.stream()
.map(pluginConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
- BaseSink<FlinkEnvironment> pluginInstance = flinkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ BaseSink<FlinkEnvironment> pluginInstance = flinkSinkPluginDiscovery.createPluginInstance(pluginIdentifier);
pluginInstance.setConfig(pluginConfig);
return pluginInstance;
}).collect(Collectors.toList());
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
index 7effd19b3..80fd61adc 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
@@ -62,7 +62,7 @@ public class SparkExecutionContext extends AbstractExecutionContext<SparkEnviron
return configList.stream()
.map(pluginConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
- BaseSource<SparkEnvironment> pluginInstance = sparkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+ BaseSource<SparkEnvironment> pluginInstance = sparkSourcePluginDiscovery.createPluginInstance(pluginIdentifier);
pluginInstance.setConfig(pluginConfig);
return pluginInstance;
}).collect(Collectors.toList());
@@ -76,7 +76,7 @@ public class SparkExecutionContext extends AbstractExecutionContext<SparkEnviron
return configList.stream()
.map(pluginConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
- BaseTransform<SparkEnvironment> pluginInstance = sparkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
+ BaseTransform<SparkEnvironment> pluginInstance = sparkTransformPluginDiscovery.createPluginInstance(pluginIdentifier);
pluginInstance.setConfig(pluginConfig);
return pluginInstance;
}).collect(Collectors.toList());
@@ -90,7 +90,7 @@ public class SparkExecutionContext extends AbstractExecutionContext<SparkEnviron
return configList.stream()
.map(pluginConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
- BaseSink<SparkEnvironment> pluginInstance = sparkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ BaseSink<SparkEnvironment> pluginInstance = sparkSinkPluginDiscovery.createPluginInstance(pluginIdentifier);
pluginInstance.setConfig(pluginConfig);
return pluginInstance;
}).collect(Collectors.toList());
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 4744fdf7e..04eee536b 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -58,7 +58,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME));
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
- sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
return seaTunnelSink;
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index b097c5a8c..6ea19fb9a 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -106,7 +106,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+ SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSource.prepare(sourceConfig);
seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index ab0200e37..c4c13b852 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -51,7 +51,7 @@ public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<Fl
.map(transformConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, transformConfig.getString(PLUGIN_NAME));
pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- FlinkStreamTransform pluginInstance = (FlinkStreamTransform) transformPluginDiscovery.getPluginInstance(pluginIdentifier);
+ FlinkStreamTransform pluginInstance = (FlinkStreamTransform) transformPluginDiscovery.createPluginInstance(pluginIdentifier);
pluginInstance.setConfig(transformConfig);
pluginInstance.prepare(flinkEnvironment);
return pluginInstance;
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 2e0096a47..e932587ff 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -54,7 +54,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
List<SeaTunnelSink<?, ?, ?, ?>> sinks = pluginConfigs.stream().map(sinkConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME));
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
return seaTunnelSink;
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index ba953ebda..d436d74f0 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -70,7 +70,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSource<?, ?, ?> seaTunnelSource = sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+ SeaTunnelSource<?, ?, ?> seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSource.prepare(sourceConfig);
seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
sources.add(seaTunnelSource);
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index faa2bacea..945259340 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -51,7 +51,7 @@ public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<Ba
.map(transformConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, transformConfig.getString(PLUGIN_NAME));
pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- BaseSparkTransform pluginInstance = transformPluginDiscovery.getPluginInstance(pluginIdentifier);
+ BaseSparkTransform pluginInstance = transformPluginDiscovery.createPluginInstance(pluginIdentifier);
pluginInstance.setConfig(transformConfig);
pluginInstance.prepare(sparkEnvironment);
return pluginInstance;
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index edbd78812..ccd85bbf7 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -47,8 +47,6 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractPluginDiscovery.class);
private final Path pluginDir;
- protected final ConcurrentHashMap<PluginIdentifier, Optional<T>> pluginInstanceMap =
- new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
@@ -69,18 +67,41 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
@Override
public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
return pluginIdentifiers.stream()
- .map(this::getPluginInstance).distinct()
+ .map(this::createPluginInstance).distinct()
.collect(Collectors.toList());
}
@Override
- public T getPluginInstance(PluginIdentifier pluginIdentifier) {
- Optional<T> pluginInstance = pluginInstanceMap
- .computeIfAbsent(pluginIdentifier, this::createPluginInstance);
- if (!pluginInstance.isPresent()) {
- throw new IllegalArgumentException("Can't find plugin: " + pluginIdentifier);
+ public T createPluginInstance(PluginIdentifier pluginIdentifier) {
+ Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
+ ClassLoader classLoader;
+ // if the plugin jar not exist in plugin dir, will load from classpath.
+ if (pluginJarPath.isPresent()) {
+ LOGGER.info("Load plugin: {} from path: {}", pluginIdentifier, pluginJarPath.get());
+ classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
+ } else {
+ LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
+ classLoader = Thread.currentThread().getContextClassLoader();
}
- return pluginInstance.get();
+ ServiceLoader<T> serviceLoader = ServiceLoader.load(getPluginBaseClass(), classLoader);
+ for (T t : serviceLoader) {
+ if (t instanceof Plugin) {
+ // old api
+ Plugin<?> pluginInstance = (Plugin<?>) t;
+ if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+ return (T) pluginInstance;
+ }
+ } else if (t instanceof PluginIdentifierInterface) {
+ // new api
+ PluginIdentifierInterface pluginIdentifierInstance = (PluginIdentifierInterface) t;
+ if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+ return (T) pluginIdentifierInstance;
+ }
+ } else {
+ throw new UnsupportedOperationException("Plugin instance: " + t + " is not supported.");
+ }
+ }
+ throw new RuntimeException("Plugin " + pluginIdentifier + " not found.");
}
/**
@@ -146,36 +167,4 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
return Optional.empty();
}
}
-
- private Optional<T> createPluginInstance(PluginIdentifier pluginIdentifier) {
- Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
- ClassLoader classLoader;
- // if the plugin jar not exist in plugin dir, will load from classpath.
- if (pluginJarPath.isPresent()) {
- LOGGER.info("Load plugin: {} from path: {}", pluginIdentifier, pluginJarPath.get());
- classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
- } else {
- LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
- classLoader = Thread.currentThread().getContextClassLoader();
- }
- ServiceLoader<T> serviceLoader = ServiceLoader.load(getPluginBaseClass(), classLoader);
- for (T t : serviceLoader) {
- if (t instanceof Plugin) {
- // old api
- Plugin<?> pluginInstance = (Plugin<?>) t;
- if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), pluginIdentifier.getPluginName())) {
- return Optional.of((T) pluginInstance);
- }
- } else if (t instanceof PluginIdentifierInterface) {
- // new api
- PluginIdentifierInterface pluginIdentifierInstance = (PluginIdentifierInterface) t;
- if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), pluginIdentifier.getPluginName())) {
- return Optional.of((T) pluginIdentifierInstance);
- }
- } else {
- throw new UnsupportedOperationException("Plugin instance: " + t + " is not supported.");
- }
- }
- return Optional.empty();
- }
}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
index cdc85860d..8a571f92c 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -58,7 +58,7 @@ public interface PluginDiscovery<T> {
* @param pluginIdentifier plugin identifier.
* @return plugin instance. If not found, throw IllegalArgumentException.
*/
- T getPluginInstance(PluginIdentifier pluginIdentifier);
+ T createPluginInstance(PluginIdentifier pluginIdentifier);
/**
* Get all plugin instances.