You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/07/05 11:58:23 UTC

[incubator-seatunnel] branch dev updated: [Core][Plugin] Fix same plugin can't create twice error. (#2132)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3dd7dc0e4 [Core][Plugin] Fix same plugin can't create twice error. (#2132)
3dd7dc0e4 is described below

commit 3dd7dc0e4cdf77bb5a039d3848365561cfac0f92
Author: Hisoka <fa...@qq.com>
AuthorDate: Tue Jul 5 19:58:17 2022 +0800

    [Core][Plugin] Fix same plugin can't create twice error. (#2132)
    
    * [Core][Plugin] fix same plugin can't create twice error.
    
    * [Core][Plugin] change create plugin instance method name to createPluginInstance.
---
 .../core/flink/config/FlinkExecutionContext.java   |  6 +-
 .../core/spark/config/SparkExecutionContext.java   |  6 +-
 .../flink/execution/SinkExecuteProcessor.java      |  2 +-
 .../flink/execution/SourceExecuteProcessor.java    |  2 +-
 .../flink/execution/TransformExecuteProcessor.java |  2 +-
 .../spark/execution/SinkExecuteProcessor.java      |  2 +-
 .../spark/execution/SourceExecuteProcessor.java    |  2 +-
 .../spark/execution/TransformExecuteProcessor.java |  2 +-
 .../plugin/discovery/AbstractPluginDiscovery.java  | 71 +++++++++-------------
 .../plugin/discovery/PluginDiscovery.java          |  2 +-
 10 files changed, 43 insertions(+), 54 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
index 773feb870..78fa9b71d 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
@@ -63,7 +63,7 @@ public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
-                BaseSource<FlinkEnvironment> pluginInstance = flinkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseSource<FlinkEnvironment> pluginInstance = flinkSourcePluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
@@ -77,7 +77,7 @@ public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
-                BaseTransform<FlinkEnvironment> pluginInstance = flinkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseTransform<FlinkEnvironment> pluginInstance = flinkTransformPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
@@ -91,7 +91,7 @@ public class FlinkExecutionContext extends AbstractExecutionContext<FlinkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
-                BaseSink<FlinkEnvironment> pluginInstance = flinkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseSink<FlinkEnvironment> pluginInstance = flinkSinkPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
index 7effd19b3..80fd61adc 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
@@ -62,7 +62,7 @@ public class SparkExecutionContext extends AbstractExecutionContext<SparkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
-                BaseSource<SparkEnvironment> pluginInstance = sparkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseSource<SparkEnvironment> pluginInstance = sparkSourcePluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
@@ -76,7 +76,7 @@ public class SparkExecutionContext extends AbstractExecutionContext<SparkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
-                BaseTransform<SparkEnvironment> pluginInstance = sparkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseTransform<SparkEnvironment> pluginInstance = sparkTransformPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
@@ -90,7 +90,7 @@ public class SparkExecutionContext extends AbstractExecutionContext<SparkEnviron
         return configList.stream()
             .map(pluginConfig -> {
                 PluginIdentifier pluginIdentifier = PluginIdentifier.of(engineType, pluginType, pluginConfig.getString("plugin_name"));
-                BaseSink<SparkEnvironment> pluginInstance = sparkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseSink<SparkEnvironment> pluginInstance = sparkSinkPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(pluginConfig);
                 return pluginInstance;
             }).collect(Collectors.toList());
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 4744fdf7e..04eee536b 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -58,7 +58,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME));
             pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
-                sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+                sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSink.prepare(sinkConfig);
             seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
             return seaTunnelSink;
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index b097c5a8c..6ea19fb9a 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -106,7 +106,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(
                 ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
             jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+            SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSource.prepare(sourceConfig);
             seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
             if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index ab0200e37..c4c13b852 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -51,7 +51,7 @@ public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<Fl
             .map(transformConfig -> {
                 PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, transformConfig.getString(PLUGIN_NAME));
                 pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-                FlinkStreamTransform pluginInstance = (FlinkStreamTransform) transformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                FlinkStreamTransform pluginInstance = (FlinkStreamTransform) transformPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(transformConfig);
                 pluginInstance.prepare(flinkEnvironment);
                 return pluginInstance;
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 2e0096a47..e932587ff 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -54,7 +54,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
         List<SeaTunnelSink<?, ?, ?, ?>> sinks = pluginConfigs.stream().map(sinkConfig -> {
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME));
             pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+            SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSink.prepare(sinkConfig);
             seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
             return seaTunnelSink;
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index ba953ebda..d436d74f0 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -70,7 +70,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(
                 ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
             jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSource<?, ?, ?> seaTunnelSource = sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+            SeaTunnelSource<?, ?, ?> seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
             seaTunnelSource.prepare(sourceConfig);
             seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
             sources.add(seaTunnelSource);
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index faa2bacea..945259340 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -51,7 +51,7 @@ public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<Ba
             .map(transformConfig -> {
                 PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, transformConfig.getString(PLUGIN_NAME));
                 pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-                BaseSparkTransform pluginInstance = transformPluginDiscovery.getPluginInstance(pluginIdentifier);
+                BaseSparkTransform pluginInstance = transformPluginDiscovery.createPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(transformConfig);
                 pluginInstance.prepare(sparkEnvironment);
                 return pluginInstance;
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index edbd78812..ccd85bbf7 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -47,8 +47,6 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractPluginDiscovery.class);
     private final Path pluginDir;
 
-    protected final ConcurrentHashMap<PluginIdentifier, Optional<T>> pluginInstanceMap =
-        new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
     protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
         new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
 
@@ -69,18 +67,41 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
     @Override
     public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
         return pluginIdentifiers.stream()
-            .map(this::getPluginInstance).distinct()
+            .map(this::createPluginInstance).distinct()
             .collect(Collectors.toList());
     }
 
     @Override
-    public T getPluginInstance(PluginIdentifier pluginIdentifier) {
-        Optional<T> pluginInstance = pluginInstanceMap
-            .computeIfAbsent(pluginIdentifier, this::createPluginInstance);
-        if (!pluginInstance.isPresent()) {
-            throw new IllegalArgumentException("Can't find plugin: " + pluginIdentifier);
+    public T createPluginInstance(PluginIdentifier pluginIdentifier) {
+        Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
+        ClassLoader classLoader;
+        // if the plugin jar not exist in plugin dir, will load from classpath.
+        if (pluginJarPath.isPresent()) {
+            LOGGER.info("Load plugin: {} from path: {}", pluginIdentifier, pluginJarPath.get());
+            classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
+        } else {
+            LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
+            classLoader = Thread.currentThread().getContextClassLoader();
         }
-        return pluginInstance.get();
+        ServiceLoader<T> serviceLoader = ServiceLoader.load(getPluginBaseClass(), classLoader);
+        for (T t : serviceLoader) {
+            if (t instanceof Plugin) {
+                // old api
+                Plugin<?> pluginInstance = (Plugin<?>) t;
+                if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+                    return (T) pluginInstance;
+                }
+            } else if (t instanceof PluginIdentifierInterface) {
+                // new api
+                PluginIdentifierInterface pluginIdentifierInstance = (PluginIdentifierInterface) t;
+                if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+                    return (T) pluginIdentifierInstance;
+                }
+            } else {
+                throw new UnsupportedOperationException("Plugin instance: " + t + " is not supported.");
+            }
+        }
+        throw new RuntimeException("Plugin " + pluginIdentifier + " not found.");
     }
 
     /**
@@ -146,36 +167,4 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
             return Optional.empty();
         }
     }
-
-    private Optional<T> createPluginInstance(PluginIdentifier pluginIdentifier) {
-        Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
-        ClassLoader classLoader;
-        // if the plugin jar not exist in plugin dir, will load from classpath.
-        if (pluginJarPath.isPresent()) {
-            LOGGER.info("Load plugin: {} from path: {}", pluginIdentifier, pluginJarPath.get());
-            classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
-        } else {
-            LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
-            classLoader = Thread.currentThread().getContextClassLoader();
-        }
-        ServiceLoader<T> serviceLoader = ServiceLoader.load(getPluginBaseClass(), classLoader);
-        for (T t : serviceLoader) {
-            if (t instanceof Plugin) {
-                // old api
-                Plugin<?> pluginInstance = (Plugin<?>) t;
-                if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), pluginIdentifier.getPluginName())) {
-                    return Optional.of((T) pluginInstance);
-                }
-            } else if (t instanceof PluginIdentifierInterface) {
-                // new api
-                PluginIdentifierInterface pluginIdentifierInstance = (PluginIdentifierInterface) t;
-                if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), pluginIdentifier.getPluginName())) {
-                    return Optional.of((T) pluginIdentifierInstance);
-                }
-            } else {
-                throw new UnsupportedOperationException("Plugin instance: " + t + " is not supported.");
-            }
-        }
-        return Optional.empty();
-    }
 }
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
index cdc85860d..8a571f92c 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -58,7 +58,7 @@ public interface PluginDiscovery<T> {
      * @param pluginIdentifier plugin identifier.
      * @return plugin instance. If not found, throw IllegalArgumentException.
      */
-    T getPluginInstance(PluginIdentifier pluginIdentifier);
+    T createPluginInstance(PluginIdentifier pluginIdentifier);
 
     /**
      * Get all plugin instances.