You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/20 02:53:16 UTC
[incubator-seatunnel] branch dev updated: [feature][engine] support add plugin jar (#3144)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2424cbfdf [feature][engine] support add plugin jar (#3144)
2424cbfdf is described below
commit 2424cbfdf4d95f93f1b73b4d36f6f127216f3c33
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Oct 20 10:53:10 2022 +0800
[feature][engine] support add plugin jar (#3144)
---
.../engine/client/job/JobExecutionEnvironment.java | 48 ++++++++++++++++++++--
.../engine/core/dag/actions/AbstractAction.java | 2 +-
.../engine/core/parse/ConnectorInstanceLoader.java | 16 ++++----
.../engine/core/parse/JobConfigParser.java | 31 +++++++++++---
.../engine/server/TaskExecutionService.java | 8 +++-
.../plugin/discovery/AbstractPluginDiscovery.java | 18 +++++++-
.../plugin/discovery/PluginDiscovery.java | 10 +++++
7 files changed, 112 insertions(+), 21 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index aa3d457e9..4a04245ca 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -18,8 +18,10 @@
package org.apache.seatunnel.engine.client.job;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
@@ -27,23 +29,38 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import java.io.IOException;
+import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class JobExecutionEnvironment {
+ private static final ILogger LOGGER = Logger.getLogger(JobExecutionEnvironment.class);
+
private final JobConfig jobConfig;
private final int maxParallelism = 1;
private final List<Action> actions = new ArrayList<>();
- private final List<URL> jarUrls = new ArrayList<>();
+ private final Set<URL> jarUrls = new HashSet<>();
+
+ private final List<URL> commonPluginJars = new ArrayList<>();
private final String jobFilePath;
@@ -61,10 +78,35 @@ public class JobExecutionEnvironment {
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
this.jobConfig.setJobContext(new JobContext(jobClient.getNewJobId()));
+ this.commonPluginJars.addAll(searchPluginJars());
+ LOGGER.info("add common jar in plugins :" + commonPluginJars);
+ }
+
+ /**
+ * Search all jars in SEATUNNEL_HOME/plugins
+ */
+ private Set<URL> searchPluginJars() {
+ try {
+ if (Files.exists(Common.pluginRootDir())) {
+ try (Stream<Path> paths = Files.walk(Common.pluginRootDir(), FileVisitOption.FOLLOW_LINKS)) {
+ return paths.filter(path -> path.toString().endsWith(".jar"))
+ .map(path -> {
+ try {
+ return path.toUri().toURL();
+ } catch (MalformedURLException e) {
+ throw new SeaTunnelEngineException(e);
+ }
+ }).collect(Collectors.toSet());
+ }
+ }
+ } catch (IOException | SeaTunnelEngineException e) {
+ LOGGER.warning(String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e);
+ }
+ return Collections.emptySet();
}
private JobConfigParser getJobConfigParser() {
- return new JobConfigParser(jobFilePath, idGenerator, jobConfig);
+ return new JobConfigParser(jobFilePath, idGenerator, jobConfig, commonPluginJars);
}
public void addAction(List<Action> actions) {
@@ -84,7 +126,7 @@ public class JobExecutionEnvironment {
Long.parseLong(jobConfig.getJobContext().getJobId()),
seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
jobConfig,
- jarUrls);
+ new ArrayList<>(jarUrls));
return jobClient.createJobProxy(jobImmutableInformation);
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
index f38046e62..33c94cea5 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -32,7 +32,7 @@ public abstract class AbstractAction implements Action {
private int parallelism = 1;
- private Set<URL> jarUrls;
+ private final Set<URL> jarUrls;
protected AbstractAction(long id, @NonNull String name, @NonNull List<Action> upstreams, @NonNull Set<URL> jarUrls) {
this.id = id;
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
index 56577cb10..a71628157 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
@@ -46,8 +46,8 @@ public class ConnectorInstanceLoader {
throw new IllegalStateException("Utility class");
}
- public static ImmutablePair<SeaTunnelSource, Set<URL>> loadSourceInstance(Config sourceConfig,
- JobContext jobContext) {
+ public static ImmutablePair<SeaTunnelSource, Set<URL>> loadSourceInstance(
+ Config sourceConfig, JobContext jobContext, List<URL> pluginJars) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -56,7 +56,7 @@ public class ConnectorInstanceLoader {
List<URL> pluginJarPaths = sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
- SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
+ SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier, pluginJars);
seaTunnelSource.prepare(sourceConfig);
seaTunnelSource.setJobContext(jobContext);
if (jobContext.getJobMode() == JobMode.BATCH
@@ -68,7 +68,7 @@ public class ConnectorInstanceLoader {
}
public static ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, Set<URL>> loadSinkInstance(
- Config sinkConfig, JobContext jobContext) {
+ Config sinkConfig, JobContext jobContext, List<URL> pluginJars) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -76,14 +76,14 @@ public class ConnectorInstanceLoader {
sinkConfig.getString(CollectionConstants.PLUGIN_NAME));
List<URL> pluginJarPaths = sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
- sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+ sinkPluginDiscovery.createPluginInstance(pluginIdentifier, pluginJars);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setJobContext(jobContext);
return new ImmutablePair<>(seaTunnelSink, new HashSet<>(pluginJarPaths));
}
- public static ImmutablePair<SeaTunnelTransform<?>, Set<URL>> loadTransformInstance(Config transformConfig,
- JobContext jobContext) {
+ public static ImmutablePair<SeaTunnelTransform<?>, Set<URL>> loadTransformInstance(
+ Config transformConfig, JobContext jobContext, List<URL> pluginJars) {
SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -92,7 +92,7 @@ public class ConnectorInstanceLoader {
List<URL> pluginJarPaths = transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
SeaTunnelTransform<?> seaTunnelTransform =
- transformPluginDiscovery.createPluginInstance(pluginIdentifier);
+ transformPluginDiscovery.createPluginInstance(pluginIdentifier, pluginJars);
seaTunnelTransform.prepare(transformConfig);
seaTunnelTransform.setJobContext(jobContext);
return new ImmutablePair<>(seaTunnelTransform, new HashSet<>(pluginJarPaths));
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 1787eda0a..92041823b 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -50,6 +50,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -81,14 +82,24 @@ public class JobConfigParser {
private Config envConfigs;
+ private List<URL> commonPluginJars;
+
public JobConfigParser(@NonNull String jobDefineFilePath,
@NonNull IdGenerator idGenerator,
@NonNull JobConfig jobConfig) {
+ this(jobDefineFilePath, idGenerator, jobConfig, Collections.emptyList());
+ }
+
+ public JobConfigParser(@NonNull String jobDefineFilePath,
+ @NonNull IdGenerator idGenerator,
+ @NonNull JobConfig jobConfig,
+ @NonNull List<URL> commonPluginJars) {
this.jobDefineFilePath = jobDefineFilePath;
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
this.seaTunnelJobConfig = new ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
this.envConfigs = seaTunnelJobConfig.getConfig("env");
+ this.commonPluginJars = commonPluginJars;
}
public ImmutablePair<List<Action>, Set<URL>> parse() {
@@ -109,9 +120,17 @@ public class JobConfigParser {
} else {
complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
}
+ actions.forEach(this::addCommonPluginJarsToAction);
return new ImmutablePair<>(actions, jarUrlsSet);
}
+ private void addCommonPluginJarsToAction(Action action) {
+ action.getJarUrls().addAll(commonPluginJars);
+ if (!action.getUpstream().isEmpty()) {
+ action.getUpstream().forEach(this::addCommonPluginJarsToAction);
+ }
+ }
+
private void jobConfigAnalyze(@NonNull Config envConfigs) {
if (envConfigs.hasPath("job.mode")) {
jobConfig.getJobContext().setJobMode(envConfigs.getEnum(JobMode.class, "job.mode"));
@@ -138,7 +157,7 @@ public class JobConfigParser {
for (Config config : sinkConfigs) {
ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, Set<URL>>
sinkListImmutablePair =
- ConnectorInstanceLoader.loadSinkInstance(config, jobConfig.getJobContext());
+ ConnectorInstanceLoader.loadSinkInstance(config, jobConfig.getJobContext(), commonPluginJars);
SinkAction sinkAction =
createSinkAction(idGenerator.getNextId(), sinkListImmutablePair.getLeft().getPluginName(),
@@ -178,7 +197,7 @@ public class JobConfigParser {
AtomicInteger totalParallelism = new AtomicInteger();
for (Config sourceConfig : sourceConfigList) {
ImmutablePair<SeaTunnelSource, Set<URL>> seaTunnelSourceListImmutablePair =
- ConnectorInstanceLoader.loadSourceInstance(sourceConfig, jobConfig.getJobContext());
+ ConnectorInstanceLoader.loadSourceInstance(sourceConfig, jobConfig.getJobContext(), commonPluginJars);
dataType = seaTunnelSourceListImmutablePair.getLeft().getProducedType();
SourceAction sourceAction = createSourceAction(
idGenerator.getNextId(),
@@ -205,7 +224,7 @@ public class JobConfigParser {
SeaTunnelDataType<?> dataTypeResult = null;
for (Config config : transformConfigList) {
ImmutablePair<SeaTunnelTransform<?>, Set<URL>> transformListImmutablePair =
- ConnectorInstanceLoader.loadTransformInstance(config, jobConfig.getJobContext());
+ ConnectorInstanceLoader.loadTransformInstance(config, jobConfig.getJobContext(), commonPluginJars);
TransformAction transformAction = createTransformAction(
idGenerator.getNextId(),
transformListImmutablePair.getLeft().getPluginName(),
@@ -269,7 +288,7 @@ public class JobConfigParser {
List<? extends Config> transformConfigs,
List<? extends Config> sinkConfigs) {
ImmutablePair<SeaTunnelSource, Set<URL>> pair =
- ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0), jobConfig.getJobContext());
+ ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0), jobConfig.getJobContext(), commonPluginJars);
SourceAction sourceAction =
createSourceAction(idGenerator.getNextId(), pair.getLeft().getPluginName(), pair.getLeft(),
pair.getRight());
@@ -280,7 +299,7 @@ public class JobConfigParser {
if (!CollectionUtils.isEmpty(transformConfigs)) {
ImmutablePair<SeaTunnelTransform<?>, Set<URL>> transformListImmutablePair =
- ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0), jobConfig.getJobContext());
+ ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0), jobConfig.getJobContext(), commonPluginJars);
transformListImmutablePair.getLeft().setTypeInfo(dataType);
dataType = transformListImmutablePair.getLeft().getProducedType();
@@ -299,7 +318,7 @@ public class JobConfigParser {
ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, Set<URL>>
sinkListImmutablePair =
- ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0), jobConfig.getJobContext());
+ ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0), jobConfig.getJobContext(), commonPluginJars);
SinkAction sinkAction = createSinkAction(
idGenerator.getNextId(),
sinkListImmutablePair.getLeft().getPluginName(),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 44d2002a9..fea05ddd4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -202,11 +202,11 @@ public class TaskExecutionService {
taskExecutionContextMap.put(task.getTaskID(), taskExecutionContext);
})
.collect(partitioningBy(Task::isThreadsShare));
+ executionContexts.put(taskGroup.getTaskGroupLocation(), new TaskGroupContext(taskGroup, classLoader));
+ cancellationFutures.put(taskGroup.getTaskGroupLocation(), cancellationFuture);
submitThreadShareTask(executionTracker, byCooperation.get(true));
submitBlockingTask(executionTracker, byCooperation.get(false));
taskGroup.setTasksContext(taskExecutionContextMap);
- executionContexts.put(taskGroup.getTaskGroupLocation(), new TaskGroupContext(taskGroup, classLoader));
- cancellationFutures.put(taskGroup.getTaskGroupLocation(), cancellationFuture);
} catch (Throwable t) {
logger.severe(ExceptionUtils.getMessage(t));
resultFuture.completeExceptionally(t);
@@ -265,6 +265,9 @@ public class TaskExecutionService {
@Override
public void run() {
+ ClassLoader classLoader = executionContexts.get(tracker.taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader();
+ ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(classLoader);
final Task t = tracker.task;
try {
startedLatch.countDown();
@@ -280,6 +283,7 @@ public class TaskExecutionService {
} finally {
tracker.taskGroupExecutionTracker.taskDone();
}
+ Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index c75e45dad..59d03e5be 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -37,6 +37,8 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -94,6 +96,11 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
@Override
public T createPluginInstance(PluginIdentifier pluginIdentifier) {
+ return (T) createPluginInstance(pluginIdentifier, Collections.EMPTY_LIST);
+ }
+
+ @Override
+ public T createPluginInstance(PluginIdentifier pluginIdentifier, Collection<URL> pluginJars) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
T pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
if (pluginInstance != null) {
@@ -106,10 +113,19 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
try {
// use current thread classloader to avoid different classloader load same class error.
this.addURLToClassLoader.accept(classLoader, pluginJarPath.get());
+ for (URL jar : pluginJars) {
+ addURLToClassLoader.accept(classLoader, jar);
+ }
} catch (Exception e) {
log.warn("can't load jar use current thread classloader, use URLClassLoader instead now." +
" message: " + e.getMessage());
- classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
+ URL[] urls = new URL[pluginJars.size() + 1];
+ int i = 0;
+ for (URL pluginJar : pluginJars) {
+ urls[i++] = pluginJar;
+ }
+ urls[i] = pluginJarPath.get();
+ classLoader = new URLClassLoader(urls, Thread.currentThread().getContextClassLoader());
}
pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
if (pluginInstance != null) {
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
index 8a571f92c..6cfcd5cf4 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import java.net.URL;
+import java.util.Collection;
import java.util.List;
/**
@@ -60,6 +61,15 @@ public interface PluginDiscovery<T> {
*/
T createPluginInstance(PluginIdentifier pluginIdentifier);
+ /**
+ * Get plugin instance by plugin identifier.
+ *
+ * @param pluginIdentifier plugin identifier.
+ * @param pluginJars used to help plugin load
+ * @return plugin instance. If not found, throw IllegalArgumentException.
+ */
+ T createPluginInstance(PluginIdentifier pluginIdentifier, Collection<URL> pluginJars);
+
/**
* Get all plugin instances.
*