You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/07 03:04:12 UTC
[incubator-seatunnel] branch dev updated: [Hotfix][Zeta] Fix Connector load logic from zeta (#4510)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 329c43218 [Hotfix][Zeta] Fix Connector load logic from zeta (#4510)
329c43218 is described below
commit 329c432186f9dddc8df89781c666ef719398af8f
Author: Jia Fan <fa...@qq.com>
AuthorDate: Fri Apr 7 11:04:06 2023 +0800
[Hotfix][Zeta] Fix Connector load logic from zeta (#4510)
---
.../engine/core/parse/ConfigParserUtil.java | 4 ++
.../core/parse/MultipleTableJobConfigParser.java | 55 +++++++++++++++-------
2 files changed, 43 insertions(+), 16 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
index 3ffdc9586..a1b8ec87e 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java
@@ -283,6 +283,10 @@ public final class ConfigParserUtil {
return readonlyConfig.getOptional(FACTORY_ID).orElse(readonlyConfig.get(PLUGIN_NAME));
}
+ public static String getFactoryId(Config config) {
+ return getFactoryId(ReadonlyConfig.fromConfig(config));
+ }
+
private enum VertexStatus {
CREATED,
LINKED
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index b4e1b424d..b4b87cb8e 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -42,10 +42,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
@@ -56,6 +55,8 @@ import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -66,7 +67,6 @@ import com.hazelcast.logging.Logger;
import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;
-import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.nio.file.Paths;
@@ -85,6 +85,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryId;
import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryUrls;
@@ -126,19 +127,6 @@ public class MultipleTableJobConfigParser {
}
public ImmutablePair<List<Action>, Set<URL>> parse() {
- List<URL> connectorJars = new ArrayList<>();
- try {
- connectorJars = FileUtils.searchJarFiles(Common.connectorJarDir("seatunnel"));
- } catch (IOException e) {
- LOGGER.info(e);
- }
- if (!commonPluginJars.isEmpty()) {
- connectorJars.addAll(commonPluginJars);
- }
- ClassLoader classLoader =
- new SeaTunnelChildFirstClassLoader(
- connectorJars, Thread.currentThread().getContextClassLoader());
- Thread.currentThread().setContextClassLoader(classLoader);
List<? extends Config> sourceConfigs =
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "source", Collections.emptyList());
@@ -149,6 +137,15 @@ public class MultipleTableJobConfigParser {
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "sink", Collections.emptyList());
+ List<URL> connectorJars = getConnectorJarList(sourceConfigs, sinkConfigs);
+ if (!commonPluginJars.isEmpty()) {
+ connectorJars.addAll(commonPluginJars);
+ }
+ ClassLoader classLoader =
+ new SeaTunnelChildFirstClassLoader(
+ connectorJars, Thread.currentThread().getContextClassLoader());
+ Thread.currentThread().setContextClassLoader(classLoader);
+
ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, sinkConfigs);
this.fillJobConfig();
@@ -184,6 +181,32 @@ public class MultipleTableJobConfigParser {
return urls;
}
+ private List<URL> getConnectorJarList(
+ List<? extends Config> sourceConfigs, List<? extends Config> sinkConfigs) {
+ List<PluginIdentifier> factoryIds =
+ Stream.concat(
+ sourceConfigs.stream()
+ .map(ConfigParserUtil::getFactoryId)
+ .map(
+ factory ->
+ PluginIdentifier.of(
+ CollectionConstants
+ .SEATUNNEL_PLUGIN,
+ CollectionConstants.SOURCE_PLUGIN,
+ factory)),
+ sinkConfigs.stream()
+ .map(ConfigParserUtil::getFactoryId)
+ .map(
+ factory ->
+ PluginIdentifier.of(
+ CollectionConstants
+ .SEATUNNEL_PLUGIN,
+ CollectionConstants.SINK_PLUGIN,
+ factory)))
+ .collect(Collectors.toList());
+ return new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(factoryIds);
+ }
+
private void fillUsedFactoryUrls(List<Action> actions, Set<URL> result) {
actions.forEach(
action -> {