You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/02 10:34:40 UTC
[incubator-seatunnel] branch st-engine updated: [ST-Engine] Add Submit Job From Client To Server (#2301)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 3ce56e465 [ST-Engine] Add Submit Job From Client To Server (#2301)
3ce56e465 is described below
commit 3ce56e4657006f43905272acbe252cdb3ff6178d
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Aug 2 18:34:35 2022 +0800
[ST-Engine] Add Submit Job From Client To Server (#2301)
* Add logicaldag generator
* merge from upstream
* fix UT error
* Add Job Submit To Server
* Add license header
add plugin jars url to JobInformation
fix style
reset AbstractPluginDiscovery to upstream
fix stye
* Fix problems in code review
* Fix problems in code review
---
LICENSE | 4 +-
.../engine/client/ConnectorInstanceLoader.java | 27 +++-
...SeaTunnelClientInstance.java => JobClient.java} | 24 +--
.../seatunnel/engine/client/JobConfigParser.java | 178 +++++++++++++++------
.../engine/client/JobExecutionEnvironment.java | 38 ++++-
.../apache/seatunnel/engine/client/JobProxy.java | 52 ++++++
.../seatunnel/engine/client/SeaTunnelClient.java | 58 +------
.../engine/client/SeaTunnelClientConfig.java | 5 +-
.../engine/client/SeaTunnelClientInstance.java | 12 +-
...elClient.java => SeaTunnelHazelcastClient.java} | 71 ++++----
.../engine/client/JobConfigParserTest.java | 9 +-
.../engine/client/LogicalDagGeneratorTest.java | 17 +-
.../engine/client/SeaTunnelClientTest.java | 26 ++-
.../apache/seatunnel/engine/common/Constant.java | 4 +
.../serializeable/ConfigDataSerializerHook.java | 3 +-
.../engine/common/utils/ExceptionUtil.java | 4 +-
.../engine/core/dag/actions/AbstractAction.java | 14 +-
.../seatunnel/engine/core/dag/actions/Action.java | 3 +
.../core/dag/actions/PartitionTransformAction.java | 11 +-
.../engine/core/dag/actions/SinkAction.java | 11 +-
.../engine/core/dag/actions/SourceAction.java | 7 +-
.../engine/core/dag/actions/TransformAction.java | 11 +-
.../org/apache/seatunnel/engine/core/job/Job.java} | 8 +-
.../engine/core/job/JobImmutableInformation.java | 104 ++++++++++++
.../protocol/codec/SeaTunnelPrintMessageCodec.java | 11 +-
...sageCodec.java => SeaTunnelSubmitJobCodec.java} | 48 +++---
.../core/serializable/JobDataSerializerHook.java | 8 +
.../SeaTunnelEngine.yaml | 15 ++
.../seatunnel/engine/server/SeaTunnelServer.java | 35 +++-
.../engine/server/SeaTunnelServerStarter.java | 3 +
.../engine/server/operation/AsyncOperation.java | 108 +++++++++++++
.../server/operation/SubmitJobOperation.java | 64 ++++++++
...Task.java => AbstractSeaTunnelMessageTask.java} | 62 ++++---
.../server/protocol/task/PrintMessageTask.java | 51 +-----
.../task/SeaTunnelMessageTaskFactoryProvider.java | 2 +
...TaskFactoryProvider.java => SubmitJobTask.java} | 37 +++--
.../serializable/OperationDataSerializerHook.java | 4 +
.../plugin/discovery/AbstractPluginDiscovery.java | 23 +--
38 files changed, 844 insertions(+), 328 deletions(-)
diff --git a/LICENSE b/LICENSE
index 09e15e69a..7d4d93ac3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -217,4 +217,6 @@ seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
generate_client_protocol.sh from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
\ No newline at end of file
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java from https://github.com/hazelcast/hazelcast
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
index 7791cbec4..0a72fde2d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
@@ -31,16 +31,28 @@ import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginD
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import java.net.URL;
+import java.util.List;
+
import scala.Serializable;
public class ConnectorInstanceLoader {
- public static SeaTunnelSource loadSourceInstance(Config sourceConfig) {
+ private ConnectorInstanceLoader() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static ImmutablePair<SeaTunnelSource, List<URL>> loadSourceInstance(Config sourceConfig) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
CollectionConstants.SOURCE_PLUGIN,
sourceConfig.getString(CollectionConstants.PLUGIN_NAME));
+ List<URL> pluginJarPaths = sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
+
SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSource.prepare(sourceConfig);
seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
@@ -49,30 +61,33 @@ public class ConnectorInstanceLoader {
throw new UnsupportedOperationException(
String.format("'%s' source don't support off-line job.", seaTunnelSource.getPluginName()));
}
- return seaTunnelSource;
+ return new ImmutablePair<>(seaTunnelSource, pluginJarPaths);
}
- public static SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> loadSinkInstance(
+ public static ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, List<URL>> loadSinkInstance(
Config sinkConfig) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
CollectionConstants.SINK_PLUGIN,
sinkConfig.getString(CollectionConstants.PLUGIN_NAME));
+ List<URL> pluginJarPaths = sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
- return seaTunnelSink;
+ return new ImmutablePair<>(seaTunnelSink, pluginJarPaths);
}
- public static SeaTunnelTransform loadTransformInstance(Config transformConfig) {
+ public static ImmutablePair<SeaTunnelTransform, List<URL>> loadTransformInstance(Config transformConfig) {
SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
CollectionConstants.TRANSFORM_PLUGIN,
transformConfig.getString(CollectionConstants.PLUGIN_NAME));
+
+ List<URL> pluginJarPaths = transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
SeaTunnelTransform seaTunnelTransform = transformPluginDiscovery.createPluginInstance(pluginIdentifier);
- return seaTunnelTransform;
+ return new ImmutablePair<>(seaTunnelTransform, pluginJarPaths);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
copy to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
index 1e0db46dd..5eb391b31 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
@@ -17,19 +17,23 @@
package org.apache.seatunnel.engine.client;
-import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import com.hazelcast.core.HazelcastInstance;
import lombok.NonNull;
-public interface SeaTunnelClientInstance {
+public class JobClient {
+ private SeaTunnelHazelcastClient hazelcastClient;
- /**
- * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
- * be a client, depending on the type of this
- */
- @NonNull
- HazelcastInstance getHazelcastInstance();
+ public JobClient(@NonNull SeaTunnelHazelcastClient hazelcastClient) {
+ this.hazelcastClient = hazelcastClient;
+ }
- JobExecutionEnvironment createExecutionContext(String filePath, JobConfig config);
+ public long getNewJobId() {
+ return hazelcastClient.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+ }
+
+ public JobProxy createJobProxy(@NonNull JobImmutableInformation jobImmutableInformation) {
+ return new JobProxy(hazelcastClient, jobImmutableInformation);
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
index 7ddedd792..42d563225 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
@@ -40,12 +40,16 @@ import com.hazelcast.logging.Logger;
import lombok.Data;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import scala.Serializable;
@@ -63,22 +67,32 @@ public class JobConfigParser {
private Map<String, List<Config>> sourceResultTableNameMap = new HashMap<>();
+ private List<Action> actions = new ArrayList<>();
+ private Set<URL> jarUrlsSet = new HashSet<>();
+
protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull IdGenerator idGenerator) {
this.jobDefineFilePath = jobDefineFilePath;
this.idGenerator = idGenerator;
}
- public List<Action> parse() {
+ public ImmutablePair<List<Action>, Set<URL>> parse() {
Config seaTunnelJobConfig = new ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
List<? extends Config> sinkConfigs = seaTunnelJobConfig.getConfigList("sink");
List<? extends Config> transformConfigs = seaTunnelJobConfig.getConfigList("transform");
List<? extends Config> sourceConfigs = seaTunnelJobConfig.getConfigList("source");
- if (sinkConfigs.size() == 1 && sourceConfigs.size() == 1 & transformConfigs.size() <= 1) {
- return sampleAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
+ if (CollectionUtils.isEmpty(sinkConfigs) || CollectionUtils.isEmpty(sourceConfigs)) {
+ throw new JobDefineCheckExceptionSeaTunnel("Source And Sink can not be null");
+ }
+
+ if (sinkConfigs.size() == 1
+ && sourceConfigs.size() == 1
+ && (CollectionUtils.isEmpty(transformConfigs) || transformConfigs.size() == 1)) {
+ sampleAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
} else {
- return complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
+ complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
}
+ return new ImmutablePair<>(actions, jarUrlsSet);
}
/**
@@ -88,18 +102,20 @@ public class JobConfigParser {
* result_table_name and source_table_name are necessary for Transform Connector.
* By the end, source_table_name is necessary for Sink Connector.
*/
- private List<Action> complexAnalyze(List<? extends Config> sourceConfigs,
- List<? extends Config> transformConfigs,
- List<? extends Config> sinkConfigs) {
+ private void complexAnalyze(List<? extends Config> sourceConfigs,
+ List<? extends Config> transformConfigs,
+ List<? extends Config> sinkConfigs) {
initRelationMap(sourceConfigs, transformConfigs);
- List<Action> actions = new ArrayList<>();
for (Config config : sinkConfigs) {
- SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
+ ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, List<URL>>
+ sinkListImmutablePair =
ConnectorInstanceLoader.loadSinkInstance(config);
SinkAction sinkAction =
- new SinkAction(idGenerator.getNextId(), seaTunnelSink.getPluginName(), seaTunnelSink);
+ createSinkAction(idGenerator.getNextId(), sinkListImmutablePair.getLeft().getPluginName(),
+ sinkListImmutablePair.getLeft(), sinkListImmutablePair.getRight());
+
actions.add(sinkAction);
if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
throw new JobDefineCheckExceptionSeaTunnel(Plugin.SOURCE_TABLE_NAME
@@ -118,7 +134,6 @@ public class JobConfigParser {
}
}
- return actions;
}
private void sourceAnalyze(String sourceTableName, Action action) {
@@ -132,10 +147,15 @@ public class JobConfigParser {
// of its upstream action.
AtomicInteger totalParallelism = new AtomicInteger();
sourceConfigList.stream().forEach(sourceConfig -> {
- SourceAction sourceAction =
- new SourceAction(idGenerator.getNextId(),
- sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
- ConnectorInstanceLoader.loadSourceInstance(sourceConfig));
+ ImmutablePair<SeaTunnelSource, List<URL>> seaTunnelSourceListImmutablePair =
+ ConnectorInstanceLoader.loadSourceInstance(sourceConfig);
+
+ SourceAction sourceAction = createSourceAction(
+ idGenerator.getNextId(),
+ sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
+ seaTunnelSourceListImmutablePair.getLeft(),
+ seaTunnelSourceListImmutablePair.getRight());
+
int sourceParallelism = getSourceParallelism(sourceConfig);
sourceAction.setParallelism(sourceParallelism);
totalParallelism.set(totalParallelism.get() + sourceParallelism);
@@ -152,10 +172,15 @@ public class JobConfigParser {
} else {
AtomicInteger totalParallelism = new AtomicInteger();
transformConfigList.stream().forEach(config -> {
- SeaTunnelTransform seaTunnelTransform = ConnectorInstanceLoader.loadTransformInstance(config);
- TransformAction transformAction =
- new TransformAction(idGenerator.getNextId(), seaTunnelTransform.getPluginName(),
- seaTunnelTransform);
+ ImmutablePair<SeaTunnelTransform, List<URL>> transformListImmutablePair =
+ ConnectorInstanceLoader.loadTransformInstance(config);
+
+ TransformAction transformAction = createTransformAction(
+ idGenerator.getNextId(),
+ transformListImmutablePair.getLeft().getPluginName(),
+ transformListImmutablePair.getLeft(),
+ transformListImmutablePair.getRight());
+
action.addUpstream(transformAction);
transformAnalyze(config.getString(Plugin.SOURCE_TABLE_NAME), transformAction);
totalParallelism.set(totalParallelism.get() + transformAction.getParallelism());
@@ -211,44 +236,47 @@ public class JobConfigParser {
* |
* Sink
*/
- private List<Action> sampleAnalyze(List<? extends Config> sourceConfigs,
- List<? extends Config> transformConfigs,
- List<? extends Config> sinkConfigs) {
- SeaTunnelSource seaTunnelSource = ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0));
+ private void sampleAnalyze(List<? extends Config> sourceConfigs,
+ List<? extends Config> transformConfigs,
+ List<? extends Config> sinkConfigs) {
+ ImmutablePair<SeaTunnelSource, List<URL>> pair =
+ ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0));
SourceAction sourceAction =
- new SourceAction(idGenerator.getNextId(), seaTunnelSource.getPluginName(), seaTunnelSource);
+ createSourceAction(idGenerator.getNextId(), pair.getLeft().getPluginName(), pair.getLeft(),
+ pair.getRight());
sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
- if (transformConfigs.size() != 0) {
- SeaTunnelTransform seaTunnelTransform =
+
+ ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, List<URL>>
+ sinkListImmutablePair = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
+
+ Action sinkUpstreamAction = sourceAction;
+
+ if (!CollectionUtils.isEmpty(transformConfigs)) {
+ ImmutablePair<SeaTunnelTransform, List<URL>> transformListImmutablePair =
ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0));
- TransformAction transformAction = new TransformAction(
+
+ TransformAction transformAction = createTransformAction(
idGenerator.getNextId(),
- seaTunnelTransform.getPluginName(),
+ transformListImmutablePair.getLeft().getPluginName(),
Lists.newArrayList(sourceAction),
- seaTunnelTransform);
+ transformListImmutablePair.getLeft(),
+ transformListImmutablePair.getRight());
- initTransformParallelism(transformConfigs, sourceAction, seaTunnelTransform, transformAction);
+ initTransformParallelism(transformConfigs, sourceAction, transformListImmutablePair.getLeft(),
+ transformAction);
- SeaTunnelSink seaTunnelSink = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
- SinkAction sinkAction = new SinkAction(
- idGenerator.getNextId(),
- seaTunnelSink.getPluginName(),
- Lists.newArrayList(transformAction),
- seaTunnelSink
- );
- sinkAction.setParallelism(transformAction.getParallelism());
- return Lists.newArrayList(sinkAction);
- } else {
- SeaTunnelSink seaTunnelSink = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
- SinkAction sinkAction = new SinkAction(
- idGenerator.getNextId(),
- seaTunnelSink.getPluginName(),
- Lists.newArrayList(sourceAction),
- seaTunnelSink
- );
- sinkAction.setParallelism(sourceAction.getParallelism());
- return Lists.newArrayList(sinkAction);
+ sinkUpstreamAction = transformAction;
}
+
+ SinkAction sinkAction = createSinkAction(
+ idGenerator.getNextId(),
+ sinkListImmutablePair.getLeft().getPluginName(),
+ Lists.newArrayList(sinkUpstreamAction),
+ sinkListImmutablePair.getLeft(),
+ sinkListImmutablePair.getRight()
+ );
+ sinkAction.setParallelism(sinkUpstreamAction.getParallelism());
+ actions.add(sinkAction);
}
private void initTransformParallelism(List<? extends Config> transformConfigs, Action upstreamAction,
@@ -271,4 +299,56 @@ public class JobConfigParser {
}
return 1;
}
+
+ private SourceAction createSourceAction(int id,
+ @NonNull String name,
+ @NonNull SeaTunnelSource source,
+ List<URL> jarUrls) {
+ if (!CollectionUtils.isEmpty(jarUrls)) {
+ jarUrlsSet.addAll(jarUrls);
+ }
+ return new SourceAction(id, name, source, jarUrls);
+ }
+
+ private TransformAction createTransformAction(int id,
+ @NonNull String name,
+ @NonNull List<Action> upstreams,
+ @NonNull SeaTunnelTransform transformation,
+ List<URL> jarUrls) {
+ if (!CollectionUtils.isEmpty(jarUrls)) {
+ jarUrlsSet.addAll(jarUrls);
+ }
+ return new TransformAction(id, name, upstreams, transformation, jarUrls);
+ }
+
+ private SinkAction createSinkAction(int id,
+ @NonNull String name,
+ @NonNull List<Action> upstreams,
+ @NonNull SeaTunnelSink sink,
+ List<URL> jarUrls) {
+ if (!CollectionUtils.isEmpty(jarUrls)) {
+ jarUrlsSet.addAll(jarUrls);
+ }
+ return new SinkAction(id, name, upstreams, sink, jarUrls);
+ }
+
+ private TransformAction createTransformAction(int id,
+ @NonNull String name,
+ @NonNull SeaTunnelTransform transformation,
+ List<URL> jarUrls) {
+ if (!CollectionUtils.isEmpty(jarUrls)) {
+ jarUrlsSet.addAll(jarUrls);
+ }
+ return new TransformAction(id, name, transformation, jarUrls);
+ }
+
+ private SinkAction createSinkAction(int id,
+ @NonNull String name,
+ @NonNull SeaTunnelSink sink,
+ List<URL> jarUrls) {
+ if (!CollectionUtils.isEmpty(jarUrls)) {
+ jarUrlsSet.addAll(jarUrls);
+ }
+ return new SinkAction(id, name, sink, jarUrls);
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
index b00bbcc27..79d60f19a 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -21,10 +21,16 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
public class JobExecutionEnvironment {
@@ -36,17 +42,23 @@ public class JobExecutionEnvironment {
private List<Action> actions = new ArrayList<>();
+ private List<URL> jarUrls = new ArrayList<>();
+
private String jobFilePath;
private IdGenerator idGenerator;
- public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath) {
+ private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
+
+ public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath,
+ SeaTunnelHazelcastClient seaTunnelHazelcastClient) {
this.jobConfig = jobConfig;
this.jobFilePath = jobFilePath;
this.idGenerator = new IdGenerator();
+ this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
}
- public JobConfigParser getJobConfigParser() {
+ private JobConfigParser getJobConfigParser() {
return new JobConfigParser(jobFilePath, idGenerator);
}
@@ -54,12 +66,32 @@ public class JobExecutionEnvironment {
this.actions.addAll(actions);
}
- public LogicalDagGenerator getLogicalDagGenerator() {
+ private LogicalDagGenerator getLogicalDagGenerator() {
return new LogicalDagGenerator(actions, jobConfig, idGenerator);
}
public List<Action> getActions() {
return actions;
}
+
+ public JobProxy execute() {
+ JobClient jobClient = new JobClient(seaTunnelHazelcastClient);
+ JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
+ jobClient.getNewJobId(),
+ seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
+ jobConfig,
+ jarUrls);
+
+ JobProxy jobProxy = jobClient.createJobProxy(jobImmutableInformation);
+ jobProxy.submitJob();
+ return jobProxy;
+ }
+
+ public LogicalDag getLogicalDag() {
+ ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
+ actions.addAll(immutablePair.getLeft());
+ jarUrls.addAll(immutablePair.getRight());
+ return getLogicalDagGenerator().generate();
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
new file mode 100644
index 000000000..ab45b5cfa
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.engine.core.job.Job;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import lombok.NonNull;
+
+import java.util.concurrent.CompletableFuture;
+
+public class JobProxy implements Job {
+ private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
+ private JobImmutableInformation jobImmutableInformation;
+
+ public JobProxy(@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient,
+ @NonNull JobImmutableInformation jobImmutableInformation) {
+ this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
+ this.jobImmutableInformation = jobImmutableInformation;
+ }
+
+ @Override
+ public long getJobId() {
+ return jobImmutableInformation.getJobId();
+ }
+
+ @Override
+ public void submitJob() {
+ ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(
+ seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation));
+ CompletableFuture<Void> voidCompletableFuture =
+ seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
+ voidCompletableFuture.join();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 4221ed873..5aa194ce7 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -18,76 +18,34 @@
package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
-import com.hazelcast.client.HazelcastClient;
-import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
-import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
-import com.hazelcast.client.impl.protocol.ClientMessage;
-import com.hazelcast.client.impl.spi.impl.ClientInvocation;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.internal.serialization.SerializationService;
-import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.logging.ILogger;
import lombok.NonNull;
-import java.util.UUID;
-import java.util.function.Function;
-
public class SeaTunnelClient implements SeaTunnelClientInstance {
- private final HazelcastClientInstanceImpl hazelcastClient;
- private final SerializationService serializationService;
+ private SeaTunnelHazelcastClient hazelcastClient;
public SeaTunnelClient(@NonNull SeaTunnelClientConfig seaTunnelClientConfig) {
- Preconditions.checkNotNull(seaTunnelClientConfig, "config");
- this.hazelcastClient =
- ((HazelcastClientProxy) HazelcastClient.newHazelcastClient(seaTunnelClientConfig)).client;
- this.serializationService = hazelcastClient.getSerializationService();
- ExceptionUtil.registerSeaTunnelExceptions(hazelcastClient.getClientExceptionFactory());
+ this.hazelcastClient = new SeaTunnelHazelcastClient(seaTunnelClientConfig);
}
- @NonNull
@Override
- public HazelcastInstance getHazelcastInstance() {
- return hazelcastClient;
+ public JobExecutionEnvironment createExecutionContext(@NonNull String filePath, JobConfig jobConfig) {
+ return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient);
}
@Override
- public JobExecutionEnvironment createExecutionContext(@NonNull String filePath, JobConfig jobConfig) {
- JobExecutionEnvironment jobExecutionEnv = new JobExecutionEnvironment(jobConfig, filePath);
- jobExecutionEnv.addAction(jobExecutionEnv.getJobConfigParser().parse());
- return jobExecutionEnv;
+ public JobClient createJobClient() {
+ return new JobClient(hazelcastClient);
}
public ILogger getLogger() {
- return hazelcastClient.getLoggingService().getLogger(getClass());
- }
-
- private <S> S invokeRequestOnMasterAndDecodeResponse(ClientMessage request,
- Function<ClientMessage, Object> decoder) {
- UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
- return invokeRequestAndDecodeResponse(masterUuid, request, decoder);
- }
-
- private <S> S invokeRequestOnAnyMemberAndDecodeResponse(ClientMessage request,
- Function<ClientMessage, Object> decoder) {
- return invokeRequestAndDecodeResponse(null, request, decoder);
- }
-
- private <S> S invokeRequestAndDecodeResponse(UUID uuid, ClientMessage request,
- Function<ClientMessage, Object> decoder) {
- ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
- try {
- ClientMessage response = invocation.invoke().get();
- return serializationService.toObject(decoder.apply(response));
- } catch (Throwable t) {
- throw ExceptionUtil.rethrow(t);
- }
+ return hazelcastClient.getLogger(getClass());
}
public String printMessageToMaster(@NonNull String msg) {
- return invokeRequestOnMasterAndDecodeResponse(
+ return hazelcastClient.requestOnMasterAndDecodeResponse(
SeaTunnelPrintMessageCodec.encodeRequest(msg),
response -> SeaTunnelPrintMessageCodec.decodeResponse(response)
);
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
index aa15c138a..c6c6972a3 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.client;
+import org.apache.seatunnel.engine.common.Constant;
+
import com.hazelcast.client.config.ClientConfig;
public class SeaTunnelClientConfig extends ClientConfig {
@@ -25,9 +27,8 @@ public class SeaTunnelClientConfig extends ClientConfig {
* Creates a new config instance with default group name for SeaTunnel Engine
*/
public SeaTunnelClientConfig() {
- // TODO we should get cluster name from server config instead of return a constant name.
super();
- setClusterName("SeaTunnel Engine");
+ setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 1e0db46dd..62590cbef 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -19,17 +19,9 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import com.hazelcast.core.HazelcastInstance;
-import lombok.NonNull;
-
public interface SeaTunnelClientInstance {
- /**
- * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
- * be a client, depending on the type of this
- */
- @NonNull
- HazelcastInstance getHazelcastInstance();
-
JobExecutionEnvironment createExecutionContext(String filePath, JobConfig config);
+
+ JobClient createJobClient();
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
copy to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index 4221ed873..6a696d628 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -17,11 +17,8 @@
package org.apache.seatunnel.engine.client;
-import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
-import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
-import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.impl.protocol.ClientMessage;
@@ -33,63 +30,79 @@ import com.hazelcast.logging.ILogger;
import lombok.NonNull;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
-public class SeaTunnelClient implements SeaTunnelClientInstance {
+public class SeaTunnelHazelcastClient {
private final HazelcastClientInstanceImpl hazelcastClient;
private final SerializationService serializationService;
- public SeaTunnelClient(@NonNull SeaTunnelClientConfig seaTunnelClientConfig) {
+ public SeaTunnelHazelcastClient(@NonNull SeaTunnelClientConfig seaTunnelClientConfig) {
Preconditions.checkNotNull(seaTunnelClientConfig, "config");
this.hazelcastClient =
- ((HazelcastClientProxy) HazelcastClient.newHazelcastClient(seaTunnelClientConfig)).client;
+ ((HazelcastClientProxy) com.hazelcast.client.HazelcastClient.newHazelcastClient(
+ seaTunnelClientConfig)).client;
this.serializationService = hazelcastClient.getSerializationService();
ExceptionUtil.registerSeaTunnelExceptions(hazelcastClient.getClientExceptionFactory());
}
+ public SerializationService getSerializationService() {
+ return serializationService;
+ }
+
+ /**
+ * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
+ * be a client, depending on the type of this
+ */
@NonNull
- @Override
public HazelcastInstance getHazelcastInstance() {
return hazelcastClient;
}
- @Override
- public JobExecutionEnvironment createExecutionContext(@NonNull String filePath, JobConfig jobConfig) {
- JobExecutionEnvironment jobExecutionEnv = new JobExecutionEnvironment(jobConfig, filePath);
- jobExecutionEnv.addAction(jobExecutionEnv.getJobConfigParser().parse());
- return jobExecutionEnv;
- }
-
- public ILogger getLogger() {
- return hazelcastClient.getLoggingService().getLogger(getClass());
+ public ILogger getLogger(Class<?> clazz) {
+ return hazelcastClient.getLoggingService().getLogger(clazz);
}
- private <S> S invokeRequestOnMasterAndDecodeResponse(ClientMessage request,
- Function<ClientMessage, Object> decoder) {
+ public <S> S requestOnMasterAndDecodeResponse(@NonNull ClientMessage request,
+ @NonNull Function<ClientMessage, Object> decoder) {
UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
- return invokeRequestAndDecodeResponse(masterUuid, request, decoder);
+ return requestAndDecodeResponse(masterUuid, request, decoder);
}
- private <S> S invokeRequestOnAnyMemberAndDecodeResponse(ClientMessage request,
- Function<ClientMessage, Object> decoder) {
- return invokeRequestAndDecodeResponse(null, request, decoder);
+ public <S> S requestOnAnyMemberAndDecodeResponse(@NonNull ClientMessage request,
+ @NonNull Function<ClientMessage, Object> decoder) {
+ return requestAndDecodeResponse(null, request, decoder);
}
- private <S> S invokeRequestAndDecodeResponse(UUID uuid, ClientMessage request,
- Function<ClientMessage, Object> decoder) {
+ public <S> S requestAndDecodeResponse(UUID uuid, ClientMessage request,
+ Function<ClientMessage, Object> decoder) {
ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
try {
ClientMessage response = invocation.invoke().get();
return serializationService.toObject(decoder.apply(response));
+ } catch (InterruptedException i) {
+ Thread.currentThread().interrupt();
+ return null;
+ } catch (Throwable t) {
+ throw ExceptionUtil.rethrow(t);
+ }
+ }
+
+ public CompletableFuture<Void> requestAndGetCompletableFuture(UUID uuid, ClientMessage request) {
+ ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
+ try {
+ return invocation.invoke().thenApply(c -> null);
} catch (Throwable t) {
throw ExceptionUtil.rethrow(t);
}
}
- public String printMessageToMaster(@NonNull String msg) {
- return invokeRequestOnMasterAndDecodeResponse(
- SeaTunnelPrintMessageCodec.encodeRequest(msg),
- response -> SeaTunnelPrintMessageCodec.decodeResponse(response)
- );
+ public CompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
+ UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
+ return requestAndGetCompletableFuture(masterUuid, request);
+ }
+
+ public CompletableFuture<Void> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request) {
+ return requestAndGetCompletableFuture(null, request);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index f8fd9c873..f9da29cf0 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -22,12 +22,15 @@ import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.net.URL;
import java.util.List;
+import java.util.Set;
@RunWith(JUnit4.class)
public class JobConfigParserTest {
@@ -38,7 +41,8 @@ public class JobConfigParserTest {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/fakesource_to_file.conf");
JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator());
- List<Action> actions = jobConfigParser.parse();
+ ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
+ List<Action> actions = parse.getLeft();
Assert.assertEquals(1, actions.size());
Assert.assertEquals("LocalFile", actions.get(0).getName());
@@ -55,7 +59,8 @@ public class JobConfigParserTest {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator());
- List<Action> actions = jobConfigParser.parse();
+ ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
+ List<Action> actions = parse.getLeft();
Assert.assertEquals(1, actions.size());
Assert.assertEquals("LocalFile", actions.get(0).getName());
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 856c88aec..19bcc5eb1 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -21,28 +21,37 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDagGenerator;
import com.hazelcast.internal.json.JsonObject;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.net.URL;
+import java.util.List;
+import java.util.Set;
+
@RunWith(JUnit4.class)
public class LogicalDagGeneratorTest {
@Test
public void testLogicalGenerator() {
Common.setDeployMode(DeployMode.CLIENT);
- String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+ String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setBoundedness(Boundedness.BOUNDED);
jobConfig.setName("fake_to_file");
- JobExecutionEnvironment jobExecutionEnv = new JobExecutionEnvironment(jobConfig, filePath);
- jobExecutionEnv.addAction(jobExecutionEnv.getJobConfigParser().parse());
- LogicalDagGenerator logicalDagGenerator = jobExecutionEnv.getLogicalDagGenerator();
+ IdGenerator idGenerator = new IdGenerator();
+ ImmutablePair<List<Action>, Set<URL>> immutablePair = new JobConfigParser(filePath, idGenerator).parse();
+
+ LogicalDagGenerator logicalDagGenerator =
+ new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, idGenerator);
LogicalDag logicalDag = logicalDagGenerator.generate();
JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
String result =
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 0870fa08c..51fd02134 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.engine.client;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
import com.google.common.collect.Lists;
@@ -37,14 +42,15 @@ public class SeaTunnelClientTest {
Config config = new Config();
config.getSecurityConfig().setEnabled(false);
config.getJetConfig().setEnabled(false);
+ config.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
config.getNetworkConfig().setPort(50001);
- HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new SeaTunnelNodeContext());
+ HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(),
+ new SeaTunnelNodeContext());
}
@Test
public void testSayHello() {
SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
- seaTunnelClientConfig.setClusterName("dev");
seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:50001"));
SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
@@ -52,4 +58,20 @@ public class SeaTunnelClientTest {
String s = engineClient.printMessageToMaster(msg);
Assert.assertEquals(msg, s);
}
+
+ @Test
+ public void testExecuteJob() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setBoundedness(Boundedness.BOUNDED);
+ jobConfig.setName("fake_to_file");
+
+ SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
+ seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:50001"));
+ SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
+ JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
+
+ JobProxy jobProxy = jobExecutionEnv.execute();
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index cb0795ee4..615b6ca48 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -19,4 +19,8 @@ package org.apache.seatunnel.engine.common;
public class Constant {
public static final String SEATUNNEL_SERVICE_NAME = "st:impl:seaTunnelServer";
+
+ public static final String SEATUNNEL_ID_GENERATOR_NAME = "SeaTunnelIdGenerator";
+
+ public static final String DEFAULT_SEATUNNEL_CLUSTER_NAME = "SeaTunnel";
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
index a5758a515..120b9b037 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/serializeable/ConfigDataSerializerHook.java
@@ -17,9 +17,10 @@
package org.apache.seatunnel.engine.common.serializeable;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
-import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.nio.serialization.DataSerializableFactory;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
index af9b504b7..7e1ed660e 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.common.utils;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckExceptionSeaTunnel;
import org.apache.seatunnel.engine.common.exception.JobNotFoundExceptionSeaTunnel;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
@@ -36,7 +37,8 @@ public final class ExceptionUtil {
private static final List<ImmutableTriple<Integer, Class<? extends Throwable>, ClientExceptionFactory.ExceptionFactory>> EXCEPTIONS = Arrays.asList(
new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START, SeaTunnelEngineException.class, SeaTunnelEngineException::new),
- new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 1, JobNotFoundExceptionSeaTunnel.class, JobNotFoundExceptionSeaTunnel::new)
+ new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 1, JobNotFoundExceptionSeaTunnel.class, JobNotFoundExceptionSeaTunnel::new),
+ new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 2, JobDefineCheckExceptionSeaTunnel.class, JobDefineCheckExceptionSeaTunnel::new)
);
private ExceptionUtil() {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
index d85b2873f..0ede9cc6d 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.core.dag.actions;
import lombok.NonNull;
+import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -30,15 +31,19 @@ public abstract class AbstractAction implements Action {
private int parallelism = 1;
- protected AbstractAction(int id, @NonNull String name, @NonNull List<Action> upstreams) {
+ private List<URL> jarUrls;
+
+ protected AbstractAction(int id, @NonNull String name, @NonNull List<Action> upstreams, @NonNull List<URL> jarUrls) {
this.id = id;
this.name = name;
this.upstreams = upstreams;
+ this.jarUrls = jarUrls;
}
- protected AbstractAction(int id, @NonNull String name) {
+ protected AbstractAction(int id, @NonNull String name, @NonNull List<URL> jarUrls) {
this.id = id;
this.name = name;
+ this.jarUrls = jarUrls;
}
@NonNull
@@ -77,4 +82,9 @@ public abstract class AbstractAction implements Action {
public int getId() {
return id;
}
+
+ @Override
+ public List<URL> getJarUrls() {
+ return jarUrls;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
index 86e39c62d..6abe84bf3 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.core.dag.actions;
import lombok.NonNull;
import java.io.Serializable;
+import java.net.URL;
import java.util.List;
public interface Action extends Serializable {
@@ -38,4 +39,6 @@ public interface Action extends Serializable {
void setParallelism(int parallelism);
int getId();
+
+ List<URL> getJarUrls();
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
index 25377c4bd..8806978a7 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
import lombok.NonNull;
+import java.net.URL;
import java.util.List;
public class PartitionTransformAction extends AbstractAction {
@@ -29,15 +30,17 @@ public class PartitionTransformAction extends AbstractAction {
public PartitionTransformAction(int id,
@NonNull String name,
@NonNull List<Action> upstreams,
- @NonNull PartitionSeaTunnelTransform partitionTransformation) {
- super(id, name, upstreams);
+ @NonNull PartitionSeaTunnelTransform partitionTransformation,
+ @NonNull List<URL> jarUrls) {
+ super(id, name, upstreams, jarUrls);
this.partitionTransformation = partitionTransformation;
}
public PartitionTransformAction(int id,
@NonNull String name,
- @NonNull PartitionSeaTunnelTransform partitionTransformation) {
- super(id, name);
+ @NonNull PartitionSeaTunnelTransform partitionTransformation,
+ @NonNull List<URL> jarUrls) {
+ super(id, name, jarUrls);
this.partitionTransformation = partitionTransformation;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
index 8dac82faf..47c387dcd 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import lombok.NonNull;
+import java.net.URL;
import java.util.List;
@SuppressWarnings("checkstyle:ClassTypeParameterName")
@@ -30,15 +31,17 @@ public class SinkAction<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends
public SinkAction(int id,
@NonNull String name,
@NonNull List<Action> upstreams,
- @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink) {
- super(id, name, upstreams);
+ @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
+ @NonNull List<URL> jarUrls) {
+ super(id, name, upstreams, jarUrls);
this.sink = sink;
}
public SinkAction(int id,
@NonNull String name,
- @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink) {
- super(id, name);
+ @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
+ @NonNull List<URL> jarUrls) {
+ super(id, name, jarUrls);
this.sink = sink;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
index 5599b0e5d..a5f834fe3 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
@@ -24,14 +24,17 @@ import com.google.common.collect.Lists;
import lombok.NonNull;
import java.io.Serializable;
+import java.net.URL;
+import java.util.List;
public class SourceAction<T, SplitT extends SourceSplit, StateT extends Serializable> extends AbstractAction {
private SeaTunnelSource<T, SplitT, StateT> source;
public SourceAction(int id,
@NonNull String name,
- @NonNull SeaTunnelSource<T, SplitT, StateT> source) {
- super(id, name, Lists.newArrayList());
+ @NonNull SeaTunnelSource<T, SplitT, StateT> source,
+ @NonNull List<URL> jarUrls) {
+ super(id, name, Lists.newArrayList(), jarUrls);
this.source = source;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
index afea5986d..c4385a064 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import lombok.NonNull;
+import java.net.URL;
import java.util.List;
public class TransformAction extends AbstractAction {
@@ -29,15 +30,17 @@ public class TransformAction extends AbstractAction {
public TransformAction(int id,
@NonNull String name,
@NonNull List<Action> upstreams,
- @NonNull SeaTunnelTransform transformation) {
- super(id, name, upstreams);
+ @NonNull SeaTunnelTransform transformation,
+ @NonNull List<URL> jarUrls) {
+ super(id, name, upstreams, jarUrls);
this.transformation = transformation;
}
public TransformAction(int id,
@NonNull String name,
- @NonNull SeaTunnelTransform transformation) {
- super(id, name);
+ @NonNull SeaTunnelTransform transformation,
+ @NonNull List<URL> jarUrls) {
+ super(id, name, jarUrls);
this.transformation = transformation;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
similarity index 84%
copy from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index cb0795ee4..26b2e76ff 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -15,8 +15,10 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.common;
+package org.apache.seatunnel.engine.core.job;
-public class Constant {
- public static final String SEATUNNEL_SERVICE_NAME = "st:impl:seaTunnelServer";
+public interface Job {
+ long getJobId();
+
+ void submitJob();
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java
new file mode 100644
index 000000000..577d52e0c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.job;
+
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
+
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+
+public class JobImmutableInformation implements IdentifiedDataSerializable {
+ private long jobId;
+
+ private long createTime;
+
+ private Data logicalDag;
+
+ private JobConfig jobConfig;
+
+ private List<URL> pluginJarsUrls;
+
+ public JobImmutableInformation() {
+ }
+
+ public JobImmutableInformation(long jobId, @NonNull Data logicalDag, @NonNull JobConfig jobConfig, @NonNull List<URL> pluginJarsUrls) {
+ this.createTime = System.currentTimeMillis();
+ this.jobId = jobId;
+ this.logicalDag = logicalDag;
+ this.jobConfig = jobConfig;
+ this.pluginJarsUrls = pluginJarsUrls;
+ }
+
+ public long getJobId() {
+ return jobId;
+ }
+
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ public Data getLogicalDag() {
+ return logicalDag;
+ }
+
+ public JobConfig getJobConfig() {
+ return jobConfig;
+ }
+
+ public List<URL> getPluginJarsUrls() {
+ return pluginJarsUrls;
+ }
+
+ @Override
+ public int getFactoryId() {
+ return JobDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return JobDataSerializerHook.JOB_IMMUTABLE_INFORMATION;
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ out.writeLong(jobId);
+ out.writeLong(createTime);
+ IOUtil.writeData(out, logicalDag);
+ out.writeObject(jobConfig);
+ out.writeObject(pluginJarsUrls);
+
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ jobId = in.readLong();
+ createTime = in.readLong();
+ logicalDag = IOUtil.readData(in);
+ jobConfig = in.readObject();
+ pluginJarsUrls = in.readObject();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
index 75060fc90..ec10e3d8a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
@@ -29,13 +29,14 @@ import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.Generated;
import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
-/**
+/*
* This file is auto-generated by the Hazelcast Client Protocol Code Generator.
* To change this file, edit the templates or the protocol
* definitions on the https://github.com/hazelcast/hazelcast-client-protocol
* and regenerate it.
*/
-@Generated("4d1cbb254e8eaad3c45fe22c57f29492")
+
+@Generated("c0a6d0c9d7eb912e8b10861931a0a695")
public final class SeaTunnelPrintMessageCodec {
//hex: 0xDE0100
public static final int REQUEST_MESSAGE_TYPE = 14549248;
@@ -51,7 +52,8 @@ public final class SeaTunnelPrintMessageCodec {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(false);
clientMessage.setOperationName("SeaTunnel.PrintMessage");
- ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ ClientMessage.Frame initialFrame =
+ new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
clientMessage.add(initialFrame);
@@ -71,7 +73,8 @@ public final class SeaTunnelPrintMessageCodec {
public static ClientMessage encodeResponse(java.lang.String response) {
ClientMessage clientMessage = ClientMessage.createForEncode();
- ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ ClientMessage.Frame initialFrame =
+ new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
clientMessage.add(initialFrame);
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
similarity index 67%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
index 75060fc90..36ec48de4 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
@@ -27,65 +27,57 @@ import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCod
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.Generated;
-import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
+import com.hazelcast.client.impl.protocol.codec.builtin.DataCodec;
-/**
+/*
* This file is auto-generated by the Hazelcast Client Protocol Code Generator.
* To change this file, edit the templates or the protocol
* definitions on the https://github.com/hazelcast/hazelcast-client-protocol
* and regenerate it.
*/
-@Generated("4d1cbb254e8eaad3c45fe22c57f29492")
-public final class SeaTunnelPrintMessageCodec {
- //hex: 0xDE0100
- public static final int REQUEST_MESSAGE_TYPE = 14549248;
- //hex: 0xDE0101
- public static final int RESPONSE_MESSAGE_TYPE = 14549249;
+
+@Generated("f0fdf747fe01901765dedbe5a527bb0f")
+public final class SeaTunnelSubmitJobCodec {
+ //hex: 0xDE0200
+ public static final int REQUEST_MESSAGE_TYPE = 14549504;
+ //hex: 0xDE0201
+ public static final int RESPONSE_MESSAGE_TYPE = 14549505;
private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
- private SeaTunnelPrintMessageCodec() {
+ private SeaTunnelSubmitJobCodec() {
}
- public static ClientMessage encodeRequest(java.lang.String message) {
+ public static ClientMessage encodeRequest(com.hazelcast.internal.serialization.Data jobImmutableInformation) {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(false);
- clientMessage.setOperationName("SeaTunnel.PrintMessage");
- ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ clientMessage.setOperationName("SeaTunnel.SubmitJob");
+ ClientMessage.Frame initialFrame =
+ new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
clientMessage.add(initialFrame);
- StringCodec.encode(clientMessage, message);
+ DataCodec.encode(clientMessage, jobImmutableInformation);
return clientMessage;
}
/**
*
*/
- public static java.lang.String decodeRequest(ClientMessage clientMessage) {
+ public static com.hazelcast.internal.serialization.Data decodeRequest(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
//empty initial frame
iterator.next();
- return StringCodec.decode(iterator);
+ return DataCodec.decode(iterator);
}
- public static ClientMessage encodeResponse(java.lang.String response) {
+ public static ClientMessage encodeResponse() {
ClientMessage clientMessage = ClientMessage.createForEncode();
- ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ ClientMessage.Frame initialFrame =
+ new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
clientMessage.add(initialFrame);
- StringCodec.encode(clientMessage, response);
return clientMessage;
}
-
- /**
- *
- */
- public static java.lang.String decodeResponse(ClientMessage clientMessage) {
- ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
- //empty initial frame
- iterator.next();
- return StringCodec.decode(iterator);
- }
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
index 2a98be5f0..b6da4a3df 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/serializable/JobDataSerializerHook.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConsta
import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalEdge;
import org.apache.seatunnel.engine.core.dag.logicaldag.LogicalVertex;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -51,6 +52,11 @@ public final class JobDataSerializerHook implements DataSerializerHook {
*/
public static final int LOGICAL_EDGE = 2;
+ /**
+ * Serialization ID of the {@link org.apache.seatunnel.engine.core.job.JobImmutableInformation} class.
+ */
+ public static final int JOB_IMMUTABLE_INFORMATION = 3;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_JOB_DATA_SERIALIZER_FACTORY_ID
@@ -77,6 +83,8 @@ public final class JobDataSerializerHook implements DataSerializerHook {
return new LogicalVertex();
case LOGICAL_EDGE:
return new LogicalEdge();
+ case JOB_IMMUTABLE_INFORMATION:
+ return new JobImmutableInformation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index a16ea482f..fb4b29e2b 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -39,3 +39,18 @@ methods:
nullable: false
since: 2.0
doc: ''
+
+ - id: 2
+ name: submitJob
+ since: 2.0
+ doc: ''
+ request:
+ retryable: false
+ partitionIdentifier: -1
+ params:
+ - name: jobImmutableInformation
+ type: Data
+ nullable: false
+ since: 2.0
+ doc: ''
+ response: {}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 51a36f373..42c3cb157 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,10 +17,14 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+
import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.ManagedService;
import com.hazelcast.internal.services.MembershipAwareService;
import com.hazelcast.internal.services.MembershipServiceEvent;
+import com.hazelcast.jet.impl.LiveOperationRegistry;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -28,17 +32,20 @@ import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
private NodeEngineImpl nodeEngine;
private final ILogger logger;
+ private final LiveOperationRegistry liveOperationRegistry;
private TaskExecutionService taskExecutionService;
public SeaTunnelServer(Node node) {
this.logger = node.getLogger(getClass());
+ this.liveOperationRegistry = new LiveOperationRegistry();
logger.info("SeaTunnel server start...");
}
@@ -82,9 +89,35 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
/**
* Used for debugging on call
*/
- public String printMessage(String message){
+ public String printMessage(String message) {
this.logger.info(nodeEngine.getThisAddress() + ":" + message);
return message;
}
+ public LiveOperationRegistry getLiveOperationRegistry() {
+ return liveOperationRegistry;
+ }
+
+ /**
+ * call by client to submit job
+ */
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public CompletableFuture<Void> submitJob(Data jobImmutableInformation) {
+ // TODO Here we need new a JobMaster and run it.
+ JobImmutableInformation jobInformation = nodeEngine.getSerializationService().toObject(jobImmutableInformation);
+ logger.info("Job [" + jobInformation.getJobId() + "] submit");
+ logger.info("Job [" + jobInformation.getJobId() + "] jar urls " + jobInformation.getPluginJarsUrls());
+ CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(2000);
+ logger.info("I am sleep 2000 ms");
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ voidCompletableFuture.complete(null);
+ }
+ }).start();
+ return voidCompletableFuture;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index e40e5d727..49db3f60c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.engine.common.Constant;
+
import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
@@ -26,6 +28,7 @@ public class SeaTunnelServerStarter {
Config config = new Config();
config.getSecurityConfig().setEnabled(false);
config.getJetConfig().setEnabled(false);
+ config.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new SeaTunnelNodeContext());
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
new file mode 100644
index 000000000..21d6dc8b8
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import static com.hazelcast.jet.impl.util.ExceptionUtil.isRestartableException;
+import static com.hazelcast.jet.impl.util.ExceptionUtil.peel;
+import static com.hazelcast.jet.impl.util.ExceptionUtil.stackTraceToString;
+import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import static com.hazelcast.spi.impl.operationservice.ExceptionAction.THROW_EXCEPTION;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+
+import com.hazelcast.jet.JetException;
+import com.hazelcast.nio.serialization.HazelcastSerializationException;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.ExceptionAction;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base class for async operations. Handles registration/deregistration of
+ * operations from live registry, exception handling and peeling and
+ * logging of exceptions
+ */
+public abstract class AsyncOperation extends Operation implements IdentifiedDataSerializable {
+
+ @Override
+ public void beforeRun() {
+ SeaTunnelServer service = getService();
+ service.getLiveOperationRegistry().register(this);
+ }
+
+ @Override
+ public final void run() {
+ CompletableFuture<?> future;
+ try {
+ future = doRun();
+ } catch (Exception e) {
+ logError(e);
+ doSendResponse(e);
+ return;
+ }
+ future.whenComplete(withTryCatch(getLogger(), (r, f) -> doSendResponse(f != null ? peel(f) : r)));
+ }
+
+ protected abstract CompletableFuture<?> doRun() throws Exception;
+
+ @Override
+ public final boolean returnsResponse() {
+ return false;
+ }
+
+ @Override
+ public final Object getResponse() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void doSendResponse(Object value) {
+ try {
+ final SeaTunnelServer service = getService();
+ service.getLiveOperationRegistry().deregister(this);
+ } finally {
+ try {
+ sendResponse(value);
+ } catch (Exception e) {
+ Throwable ex = peel(e);
+ if (value instanceof Throwable && ex instanceof HazelcastSerializationException) {
+ // Sometimes exceptions are not serializable, for example on
+ // https://github.com/hazelcast/hazelcast-jet/issues/1995.
+ // When sending exception as a response and the serialization fails,
+ // the response will not be sent and the operation will hang.
+ // To prevent this from happening, replace the exception with
+ // another exception that can be serialized.
+ sendResponse(new JetException(stackTraceToString(ex)));
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Override
+ public ExceptionAction onInvocationException(Throwable throwable) {
+ return isRestartableException(throwable) ? THROW_EXCEPTION : super.onInvocationException(throwable);
+ }
+
+ @Override
+ public final int getFactoryId() {
+ return OperationDataSerializerHook.FACTORY_ID;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
new file mode 100644
index 000000000..689f697be
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public class SubmitJobOperation extends AsyncOperation {
+ private Data jobImmutableInformation;
+
+ public SubmitJobOperation() {
+ }
+
+ public SubmitJobOperation(Data jobImmutableInformation) {
+ this.jobImmutableInformation = jobImmutableInformation;
+ }
+
+ @Override
+ public int getClassId() {
+ return OperationDataSerializerHook.SUBMIT_OPERATOR;
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ IOUtil.writeData(out, jobImmutableInformation);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ jobImmutableInformation = IOUtil.readData(in);
+ }
+
+ @Override
+ protected CompletableFuture<?> doRun() throws Exception {
+ SeaTunnelServer seaTunnelServer = getService();
+ CompletableFuture<Void> voidCompletableFuture = seaTunnelServer.submitJob(jobImmutableInformation);
+ return voidCompletableFuture;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
index 78dee775e..33c829f75 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
@@ -17,58 +17,46 @@
package org.apache.seatunnel.engine.server.protocol.task;
-import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
-import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import com.google.common.base.Function;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.task.AbstractInvocationMessageTask;
import com.hazelcast.cluster.Address;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.internal.serialization.Data;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.security.Permission;
+import java.util.function.Function;
-public class PrintMessageTask extends AbstractInvocationMessageTask<String> {
+abstract class AbstractSeaTunnelMessageTask<P, R> extends AbstractInvocationMessageTask<P> {
+ private final Function<ClientMessage, P> decoder;
+ private final Function<R, ClientMessage> encoder;
- protected PrintMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
+ protected AbstractSeaTunnelMessageTask(ClientMessage clientMessage, Node node, Connection connection,
+ Function<ClientMessage, P> decoder, Function<R, ClientMessage> encoder) {
super(clientMessage, node, connection);
- }
- @Override
- protected InvocationBuilder getInvocationBuilder(Operation op) {
- Address masterAddress = nodeEngine.getMasterAddress();
- if (masterAddress == null) {
- throw new RetryableHazelcastException("master not yet known");
- }
- return nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- op, masterAddress);
- }
-
- @Override
- protected Operation prepareOperation() {
- return new PrintMessageOperation(parameters);
+ this.decoder = decoder;
+ this.encoder = encoder;
}
@Override
- protected String decodeClientMessage(ClientMessage clientMessage) {
- Function<ClientMessage, String> decodeRequest = SeaTunnelPrintMessageCodec::decodeRequest;
- return decodeRequest.apply(clientMessage);
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
}
@Override
- protected ClientMessage encodeResponse(Object response) {
- Function<String, ClientMessage> encodeResponse = SeaTunnelPrintMessageCodec::encodeResponse;
- return encodeResponse.apply((String) response);
+ protected final P decodeClientMessage(ClientMessage clientMessage) {
+ return decoder.apply(clientMessage);
}
@Override
- public String getServiceName() {
- return Constant.SEATUNNEL_SERVICE_NAME;
+ protected ClientMessage encodeResponse(Object o) {
+ return encoder.apply((R) o);
}
@Override
@@ -81,13 +69,21 @@ public class PrintMessageTask extends AbstractInvocationMessageTask<String> {
return null;
}
- @Override
- public String getMethodName() {
- return "printMessage";
+ protected <V> Data toData(V v) {
+ return nodeEngine.getSerializationService().toData(v);
}
@Override
- public Object[] getParameters() {
- return new Object[0];
+ protected InvocationBuilder getInvocationBuilder(Operation operation) {
+ Address masterAddress = nodeEngine.getMasterAddress();
+ if (masterAddress == null) {
+ throw new RetryableHazelcastException("master not yet known");
+ }
+ return nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME,
+ operation, masterAddress);
+ }
+
+ protected SeaTunnelServer getSeaTunnelService() {
+ return getService(SeaTunnelServer.SERVICE_NAME);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
index 78dee775e..a39dfeeeb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
@@ -17,36 +17,20 @@
package org.apache.seatunnel.engine.server.protocol.task;
-import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
-import com.google.common.base.Function;
import com.hazelcast.client.impl.protocol.ClientMessage;
-import com.hazelcast.client.impl.protocol.task.AbstractInvocationMessageTask;
-import com.hazelcast.cluster.Address;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.nio.Connection;
-import com.hazelcast.spi.exception.RetryableHazelcastException;
-import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
import com.hazelcast.spi.impl.operationservice.Operation;
-import java.security.Permission;
-
-public class PrintMessageTask extends AbstractInvocationMessageTask<String> {
+public class PrintMessageTask extends AbstractSeaTunnelMessageTask<String, String> {
protected PrintMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
- super(clientMessage, node, connection);
- }
-
- @Override
- protected InvocationBuilder getInvocationBuilder(Operation op) {
- Address masterAddress = nodeEngine.getMasterAddress();
- if (masterAddress == null) {
- throw new RetryableHazelcastException("master not yet known");
- }
- return nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- op, masterAddress);
+ super(clientMessage, node, connection,
+ SeaTunnelPrintMessageCodec::decodeRequest,
+ SeaTunnelPrintMessageCodec::encodeResponse);
}
@Override
@@ -54,33 +38,6 @@ public class PrintMessageTask extends AbstractInvocationMessageTask<String> {
return new PrintMessageOperation(parameters);
}
- @Override
- protected String decodeClientMessage(ClientMessage clientMessage) {
- Function<ClientMessage, String> decodeRequest = SeaTunnelPrintMessageCodec::decodeRequest;
- return decodeRequest.apply(clientMessage);
- }
-
- @Override
- protected ClientMessage encodeResponse(Object response) {
- Function<String, ClientMessage> encodeResponse = SeaTunnelPrintMessageCodec::encodeResponse;
- return encodeResponse.apply((String) response);
- }
-
- @Override
- public String getServiceName() {
- return Constant.SEATUNNEL_SERVICE_NAME;
- }
-
- @Override
- public Permission getRequiredPermission() {
- return null;
- }
-
- @Override
- public String getDistributedObjectName() {
- return null;
- }
-
@Override
public String getMethodName() {
return "printMessage";
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index 9b25ecd32..2b6c78488 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.protocol.task;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
import com.hazelcast.client.impl.protocol.MessageTaskFactory;
import com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider;
@@ -42,5 +43,6 @@ public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryPr
private void initFactories() {
factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new PrintMessageTask(clientMessage, node, connection));
+ factories.put(SeaTunnelSubmitJobCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
index 9b25ecd32..5cb228b3e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
@@ -17,30 +17,35 @@
package org.apache.seatunnel.engine.server.protocol.task;
-import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
+import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
-import com.hazelcast.client.impl.protocol.MessageTaskFactory;
-import com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider;
+import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.instance.impl.Node;
-import com.hazelcast.internal.util.collection.Int2ObjectHashMap;
-import com.hazelcast.spi.impl.NodeEngine;
-import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.spi.impl.operationservice.Operation;
-public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryProvider {
- private final Int2ObjectHashMap<MessageTaskFactory> factories = new Int2ObjectHashMap<>(60);
- public final Node node;
+public class SubmitJobTask extends AbstractSeaTunnelMessageTask<Data, Void> {
- public SeaTunnelMessageTaskFactoryProvider(NodeEngine nodeEngine) {
- this.node = ((NodeEngineImpl) nodeEngine).getNode();
- initFactories();
+ protected SubmitJobTask(ClientMessage clientMessage, Node node, Connection connection) {
+ super(clientMessage, node, connection,
+ SeaTunnelSubmitJobCodec::decodeRequest,
+ x -> SeaTunnelSubmitJobCodec.encodeResponse());
}
@Override
- public Int2ObjectHashMap<MessageTaskFactory> getFactories() {
- return this.factories;
+ protected Operation prepareOperation() {
+ return new SubmitJobOperation(parameters);
}
- private void initFactories() {
- factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new PrintMessageTask(clientMessage, node, connection));
+ @Override
+ public String getMethodName() {
+ return "submitJob";
+ }
+
+ @Override
+ public Object[] getParameters() {
+ return new Object[]{};
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index f56a68353..3153413d4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.serializable;
import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -34,6 +35,7 @@ import com.hazelcast.spi.annotation.PrivateApi;
@PrivateApi
public final class OperationDataSerializerHook implements DataSerializerHook {
public static final int PRINT_MESSAGE_OPERATOR = 0;
+ public static final int SUBMIT_OPERATOR = 1;
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -57,6 +59,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
switch (typeId) {
case PRINT_MESSAGE_OPERATOR:
return new PrintMessageOperation();
+ case SUBMIT_OPERATOR:
+ return new SubmitJobOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 529049455..baacf1636 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -64,7 +64,7 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
};
protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
- new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+ new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
public AbstractPluginDiscovery(String pluginSubDir, BiConsumer<ClassLoader, URL> addURLToClassloader) {
this.pluginDir = Common.connectorJarDir(pluginSubDir);
@@ -80,10 +80,10 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
@Override
public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
return pluginIdentifiers.stream()
- .map(this::getPluginJarPath)
- .filter(Optional::isPresent)
- .map(Optional::get).distinct()
- .collect(Collectors.toList());
+ .map(this::getPluginJarPath)
+ .filter(Optional::isPresent)
+ .map(Optional::get).distinct()
+ .collect(Collectors.toList());
}
@Override
@@ -109,13 +109,14 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
this.addURLToClassLoader.accept(classLoader, pluginJarPath.get());
} catch (Exception e) {
LOGGER.warn("can't load jar use current thread classloader, use URLClassLoader instead now." +
- " message: " + e.getMessage());
- classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
+ " message: " + e.getMessage());
+ classLoader =
+ new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
}
pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
if (pluginInstance != null) {
LOGGER.info("Load plugin: {} from path: {} use classloader: {}",
- pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
+ pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
return pluginInstance;
}
}
@@ -135,7 +136,8 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
} else if (t instanceof PluginIdentifierInterface) {
// new api
PluginIdentifierInterface pluginIdentifierInstance = (PluginIdentifierInterface) t;
- if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), pluginIdentifier.getPluginName())) {
+ if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(),
+ pluginIdentifier.getPluginName())) {
return (T) pluginIdentifierInstance;
}
} else {
@@ -193,7 +195,8 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
File[] targetPluginFiles = pluginDir.toFile().listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
- return pathname.getName().endsWith(".jar") && StringUtils.startsWithIgnoreCase(pathname.getName(), pluginJarPrefix);
+ return pathname.getName().endsWith(".jar") &&
+ StringUtils.startsWithIgnoreCase(pathname.getName(), pluginJarPrefix);
}
});
if (ArrayUtils.isEmpty(targetPluginFiles)) {