You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/07 03:04:12 UTC

[incubator-seatunnel] branch dev updated: [Hotfix][Zeta] Fix Connector load logic from zeta (#4510)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 329c43218 [Hotfix][Zeta] Fix Connector load logic from zeta (#4510)
329c43218 is described below

commit 329c432186f9dddc8df89781c666ef719398af8f
Author: Jia Fan <fa...@qq.com>
AuthorDate: Fri Apr 7 11:04:06 2023 +0800

    [Hotfix][Zeta] Fix Connector load logic from zeta (#4510)
---
 .../engine/core/parse/ConfigParserUtil.java        |  4 ++
 .../core/parse/MultipleTableJobConfigParser.java   | 55 +++++++++++++++-------
 2 files changed, 43 insertions(+), 16 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
index 3ffdc9586..a1b8ec87e 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
@@ -283,6 +283,10 @@ public final class ConfigParserUtil {
         return readonlyConfig.getOptional(FACTORY_ID).orElse(readonlyConfig.get(PLUGIN_NAME));
     }
 
+    public static String getFactoryId(Config config) {
+        return getFactoryId(ReadonlyConfig.fromConfig(config));
+    }
+
     private enum VertexStatus {
         CREATED,
         LINKED
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index b4e1b424d..b4b87cb8e 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -42,10 +42,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
@@ -56,6 +55,8 @@ import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -66,7 +67,6 @@ import com.hazelcast.logging.Logger;
 import lombok.extern.slf4j.Slf4j;
 import scala.Tuple2;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.net.URL;
 import java.nio.file.Paths;
@@ -85,6 +85,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryId;
 import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryUrls;
@@ -126,19 +127,6 @@ public class MultipleTableJobConfigParser {
     }
 
     public ImmutablePair<List<Action>, Set<URL>> parse() {
-        List<URL> connectorJars = new ArrayList<>();
-        try {
-            connectorJars = FileUtils.searchJarFiles(Common.connectorJarDir("seatunnel"));
-        } catch (IOException e) {
-            LOGGER.info(e);
-        }
-        if (!commonPluginJars.isEmpty()) {
-            connectorJars.addAll(commonPluginJars);
-        }
-        ClassLoader classLoader =
-                new SeaTunnelChildFirstClassLoader(
-                        connectorJars, Thread.currentThread().getContextClassLoader());
-        Thread.currentThread().setContextClassLoader(classLoader);
         List<? extends Config> sourceConfigs =
                 TypesafeConfigUtils.getConfigList(
                         seaTunnelJobConfig, "source", Collections.emptyList());
@@ -149,6 +137,15 @@ public class MultipleTableJobConfigParser {
                 TypesafeConfigUtils.getConfigList(
                         seaTunnelJobConfig, "sink", Collections.emptyList());
 
+        List<URL> connectorJars = getConnectorJarList(sourceConfigs, sinkConfigs);
+        if (!commonPluginJars.isEmpty()) {
+            connectorJars.addAll(commonPluginJars);
+        }
+        ClassLoader classLoader =
+                new SeaTunnelChildFirstClassLoader(
+                        connectorJars, Thread.currentThread().getContextClassLoader());
+        Thread.currentThread().setContextClassLoader(classLoader);
+
         ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, sinkConfigs);
 
         this.fillJobConfig();
@@ -184,6 +181,32 @@ public class MultipleTableJobConfigParser {
         return urls;
     }
 
+    private List<URL> getConnectorJarList(
+            List<? extends Config> sourceConfigs, List<? extends Config> sinkConfigs) {
+        List<PluginIdentifier> factoryIds =
+                Stream.concat(
+                                sourceConfigs.stream()
+                                        .map(ConfigParserUtil::getFactoryId)
+                                        .map(
+                                                factory ->
+                                                        PluginIdentifier.of(
+                                                                CollectionConstants
+                                                                        .SEATUNNEL_PLUGIN,
+                                                                CollectionConstants.SOURCE_PLUGIN,
+                                                                factory)),
+                                sinkConfigs.stream()
+                                        .map(ConfigParserUtil::getFactoryId)
+                                        .map(
+                                                factory ->
+                                                        PluginIdentifier.of(
+                                                                CollectionConstants
+                                                                        .SEATUNNEL_PLUGIN,
+                                                                CollectionConstants.SINK_PLUGIN,
+                                                                factory)))
+                        .collect(Collectors.toList());
+        return new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(factoryIds);
+    }
+
     private void fillUsedFactoryUrls(List<Action> actions, Set<URL> result) {
         actions.forEach(
                 action -> {