You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/12 06:49:00 UTC
[incubator-seatunnel] branch st-engine updated: [ST-Engine] Add PipelineBaseScheduler For Engine (#2396)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 9fe2d3def [ST-Engine] Add PipelineBaseScheduler For Engine (#2396)
9fe2d3def is described below
commit 9fe2d3def324cad7b37db9608ceebd2f07b13241
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Aug 12 14:48:53 2022 +0800
[ST-Engine] Add PipelineBaseScheduler For Engine (#2396)
* Add SeaTunnel Engine ConfigProvider and seatunnel-seatunnel-starter
* add source file to licenserc.yaml
* fix checkstyle
* fix error
* tmp
* tmp
* Update PhysicalPlan to support scheduler by pipeline
* remove init in run
* tmp
* complete scheduler
* optimize scheduler
* fix checkstyle
* fix checkstyle
* Update seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
Co-authored-by: Hisoka <fa...@qq.com>
* Update SeaTunnelStarter.java
Co-authored-by: Hisoka <fa...@qq.com>
---
.../seatunnel/common/utils/ExceptionUtils.java | 35 ++---
.../core/starter/seatunnel/SeaTunnelStarter.java | 7 +-
.../seatunnel/engine/client/JobConfigParser.java | 10 +-
.../engine/client/JobExecutionEnvironment.java | 3 +-
.../apache/seatunnel/engine/client/JobProxy.java | 12 +-
.../engine/client/SeaTunnelHazelcastClient.java | 7 +-
.../engine/client/SeaTunnelClientTest.java | 11 +-
.../exception/JobNoEnoughResourceException.java} | 25 +--
.../seatunnel/engine/common/utils/IdGenerator.java | 4 +-
.../engine/common/utils/NonCompletableFuture.java | 2 +-
.../engine/core/dag/actions/AbstractAction.java | 8 +-
.../seatunnel/engine/core/dag/actions/Action.java | 2 +-
.../core/dag/actions/PartitionTransformAction.java | 4 +-
.../core/dag/actions/PhysicalSourceAction.java | 4 +-
.../engine/core/dag/actions/SinkAction.java | 4 +-
.../engine/core/dag/actions/SourceAction.java | 2 +-
.../engine/core/dag/actions/TransformAction.java | 4 +-
.../core/dag/actions/TransformChainAction.java | 4 +-
.../core/dag/internal/IntermediateDataQueue.java | 6 +-
.../engine/core/dag/logical/LogicalDag.java | 10 +-
.../core/dag/logical/LogicalDagGenerator.java | 10 +-
.../engine/core/dag/logical/LogicalEdge.java | 14 +-
.../engine/core/dag/logical/LogicalVertex.java | 6 +-
.../org/apache/seatunnel/engine/core/job/Job.java | 4 +-
.../seatunnel/engine/core/job/PipelineState.java | 2 +-
.../seatunnel/engine/server/SeaTunnelServer.java | 13 +-
.../engine/server/TaskExecutionService.java | 44 ++++--
.../engine/server/dag/execution/ExecutionEdge.java | 4 +-
.../dag/execution/ExecutionPlanGenerator.java | 8 +-
.../server/dag/execution/ExecutionVertex.java | 2 +-
.../engine/server/dag/execution/Pipeline.java | 6 +-
.../server/dag/execution/PipelineGenerator.java | 2 +-
.../engine/server/dag/physical/PhysicalPlan.java | 53 +++++--
.../server/dag/physical/PhysicalPlanGenerator.java | 37 +++--
.../engine/server/dag/physical/PhysicalVertex.java | 171 +++++++++++++++++----
.../engine/server/dag/physical/SubPlan.java | 113 ++++++++++++--
.../engine/server/execution/ExecutionState.java | 22 ++-
.../seatunnel/engine/server/master/JobMaster.java | 60 ++++++--
.../engine/server/operation/AsyncOperation.java | 11 +-
...tJobOperation.java => DeployTaskOperation.java} | 36 +++--
.../server/operation/SubmitJobOperation.java | 9 +-
.../ResourceManager.java} | 12 +-
.../resourcemanager/SimpleResourceManager.java | 71 +++++++++
.../engine/server/scheduler/JobScheduler.java | 2 -
.../server/scheduler/PipelineBaseScheduler.java | 121 +++++++++++++++
.../serializable/OperationDataSerializerHook.java | 4 +
.../serializable/TaskDataSerializerHook.java | 4 +-
...nfo.java => TaskGroupImmutableInformation.java} | 24 ++-
48 files changed, 772 insertions(+), 257 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
similarity index 55%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
copy to seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
index b7def66ad..c4d2d690a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
@@ -15,27 +15,26 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.dag.execution;
+package org.apache.seatunnel.common.utils;
-import java.util.List;
-import java.util.Map;
+import java.io.PrintWriter;
+import java.io.StringWriter;
-public class Pipeline {
- private final List<ExecutionEdge> edges;
-
- private final Map<Integer, ExecutionVertex> vertexes;
-
- Pipeline(List<ExecutionEdge> edges, Map<Integer, ExecutionVertex> vertexes) {
- this.edges = edges;
- this.vertexes = vertexes;
+public class ExceptionUtils {
+ private ExceptionUtils() {
}
- public List<ExecutionEdge> getEdges() {
- return edges;
+ public static String getMessage(Throwable e) {
+ try (StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw)) {
+ // Output the error stack information to the printWriter
+ e.printStackTrace(pw);
+ pw.flush();
+ sw.flush();
+ return sw.toString();
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ throw new RuntimeException("Failed to print exception logs", e1);
+ }
}
-
- public Map<Integer, ExecutionVertex> getVertexes() {
- return vertexes;
- }
-
}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
index 6c3372ad7..ef202fd97 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
+++ b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
import com.hazelcast.client.config.ClientConfig;
import java.nio.file.Path;
+import java.util.concurrent.ExecutionException;
public class SeaTunnelStarter {
public static void main(String[] args) {
@@ -43,7 +44,11 @@ public class SeaTunnelStarter {
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
- JobProxy jobProxy = jobExecutionEnv.execute();
+ try {
+ JobProxy jobProxy = jobExecutionEnv.execute();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
// TODO wait for job complete and then exit
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
index 58b2ee3ad..843296bed 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
@@ -312,7 +312,7 @@ public class JobConfigParser {
return 1;
}
- private SourceAction createSourceAction(int id,
+ private SourceAction createSourceAction(long id,
@NonNull String name,
@NonNull SeaTunnelSource source,
List<URL> jarUrls) {
@@ -322,7 +322,7 @@ public class JobConfigParser {
return new SourceAction(id, name, source, jarUrls);
}
- private TransformAction createTransformAction(int id,
+ private TransformAction createTransformAction(long id,
@NonNull String name,
@NonNull List<Action> upstreams,
@NonNull SeaTunnelTransform transformation,
@@ -333,7 +333,7 @@ public class JobConfigParser {
return new TransformAction(id, name, upstreams, transformation, jarUrls);
}
- private SinkAction createSinkAction(int id,
+ private SinkAction createSinkAction(long id,
@NonNull String name,
@NonNull List<Action> upstreams,
@NonNull SeaTunnelSink sink,
@@ -344,7 +344,7 @@ public class JobConfigParser {
return new SinkAction(id, name, upstreams, sink, jarUrls);
}
- private TransformAction createTransformAction(int id,
+ private TransformAction createTransformAction(long id,
@NonNull String name,
@NonNull SeaTunnelTransform transformation,
List<URL> jarUrls) {
@@ -354,7 +354,7 @@ public class JobConfigParser {
return new TransformAction(id, name, transformation, jarUrls);
}
- private SinkAction createSinkAction(int id,
+ private SinkAction createSinkAction(long id,
@NonNull String name,
@NonNull SeaTunnelSink sink,
List<URL> jarUrls) {
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
index 94b965460..81a278111 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -31,6 +31,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
public class JobExecutionEnvironment {
@@ -74,7 +75,7 @@ public class JobExecutionEnvironment {
return actions;
}
- public JobProxy execute() {
+ public JobProxy execute() throws ExecutionException, InterruptedException {
JobClient jobClient = new JobClient(seaTunnelHazelcastClient);
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
jobClient.getNewJobId(),
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
index ab45b5cfa..84b80770d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
@@ -17,16 +17,20 @@
package org.apache.seatunnel.engine.client;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
import org.apache.seatunnel.engine.core.job.Job;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
import lombok.NonNull;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
public class JobProxy implements Job {
+ private static final ILogger LOGGER = Logger.getLogger(JobProxy.class);
private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
private JobImmutableInformation jobImmutableInformation;
@@ -42,11 +46,11 @@ public class JobProxy implements Job {
}
@Override
- public void submitJob() {
+ public void submitJob() throws ExecutionException, InterruptedException {
ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(
seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation));
- CompletableFuture<Void> voidCompletableFuture =
+ NonCompletableFuture<Void> submitJobFuture =
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
- voidCompletableFuture.join();
+ submitJobFuture.get();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index 369d290bb..c22faeb1c 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
@@ -89,16 +90,16 @@ public class SeaTunnelHazelcastClient {
}
}
- public CompletableFuture<Void> requestAndGetCompletableFuture(UUID uuid, ClientMessage request) {
+ public NonCompletableFuture<Void> requestAndGetCompletableFuture(UUID uuid, ClientMessage request) {
ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
try {
- return invocation.invoke().thenApply(c -> null);
+ return new NonCompletableFuture<>(invocation.invoke().thenApply(c -> null));
} catch (Throwable t) {
throw ExceptionUtil.rethrow(t);
}
}
- public CompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
+ public NonCompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
return requestAndGetCompletableFuture(masterUuid, request);
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index ee9e2533a..7c97ed7fc 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -35,6 +35,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.util.concurrent.ExecutionException;
+
@SuppressWarnings("checkstyle:MagicNumber")
@RunWith(JUnit4.class)
public class SeaTunnelClientTest {
@@ -70,7 +72,14 @@ public class SeaTunnelClientTest {
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
- JobProxy jobProxy = jobExecutionEnv.execute();
+ JobProxy jobProxy = null;
+ try {
+ jobProxy = jobExecutionEnv.execute();
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
Assert.assertNotNull(jobProxy);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
similarity index 59%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
index b7def66ad..9a0b89565 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
@@ -15,27 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.dag.execution;
+package org.apache.seatunnel.engine.common.exception;
-import java.util.List;
-import java.util.Map;
-
-public class Pipeline {
- private final List<ExecutionEdge> edges;
-
- private final Map<Integer, ExecutionVertex> vertexes;
-
- Pipeline(List<ExecutionEdge> edges, Map<Integer, ExecutionVertex> vertexes) {
- this.edges = edges;
- this.vertexes = vertexes;
+public class JobNoEnoughResourceException extends SeaTunnelEngineException {
+ public JobNoEnoughResourceException(String jobName, long jobId, int pipelineIndex, int totalPipelineNum) {
+ super(String.format("Job %s (%s), Pipeline [(%s/%s)] have no enough resource.", jobName, jobId, pipelineIndex + 1, totalPipelineNum));
}
- public List<ExecutionEdge> getEdges() {
- return edges;
+ public JobNoEnoughResourceException(String message) {
+ super(message);
}
- public Map<Integer, ExecutionVertex> getVertexes() {
- return vertexes;
+ public JobNoEnoughResourceException(String message, Throwable cause) {
+ super(message, cause);
}
-
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
index 4b10e8d60..0b477a956 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/IdGenerator.java
@@ -26,9 +26,9 @@ import java.io.Serializable;
public class IdGenerator implements Serializable {
private static final long serialVersionUID = 7683323453014131725L;
- private int id = 0;
+ private long id = 0;
- public int getNextId() {
+ public long getNextId() {
id++;
return id;
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
index 5bedc6900..a3c6d2380 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
@@ -62,7 +62,7 @@ public class NonCompletableFuture<T> extends CompletableFuture<T> {
throw new UnsupportedOperationException("This future can't be completed by an outside caller");
}
- public void internalComplete(T value) {
+ private void internalComplete(T value) {
super.complete(value);
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
index 0ede9cc6d..63aad155a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -27,20 +27,20 @@ public abstract class AbstractAction implements Action {
private String name;
private List<Action> upstreams = new ArrayList<>();
// This is used to assign a unique ID to every Action
- private int id;
+ private long id;
private int parallelism = 1;
private List<URL> jarUrls;
- protected AbstractAction(int id, @NonNull String name, @NonNull List<Action> upstreams, @NonNull List<URL> jarUrls) {
+ protected AbstractAction(long id, @NonNull String name, @NonNull List<Action> upstreams, @NonNull List<URL> jarUrls) {
this.id = id;
this.name = name;
this.upstreams = upstreams;
this.jarUrls = jarUrls;
}
- protected AbstractAction(int id, @NonNull String name, @NonNull List<URL> jarUrls) {
+ protected AbstractAction(long id, @NonNull String name, @NonNull List<URL> jarUrls) {
this.id = id;
this.name = name;
this.jarUrls = jarUrls;
@@ -79,7 +79,7 @@ public abstract class AbstractAction implements Action {
}
@Override
- public int getId() {
+ public long getId() {
return id;
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
index 6abe84bf3..e35e450f9 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
@@ -38,7 +38,7 @@ public interface Action extends Serializable {
void setParallelism(int parallelism);
- int getId();
+ long getId();
List<URL> getJarUrls();
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
index 88e5e5881..c9e8f1d78 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PartitionTransformAction.java
@@ -27,7 +27,7 @@ import java.util.List;
public class PartitionTransformAction extends AbstractAction {
private final PartitionSeaTunnelTransform partitionTransformation;
- public PartitionTransformAction(int id,
+ public PartitionTransformAction(long id,
@NonNull String name,
@NonNull List<Action> upstreams,
@NonNull PartitionSeaTunnelTransform partitionTransformation,
@@ -36,7 +36,7 @@ public class PartitionTransformAction extends AbstractAction {
this.partitionTransformation = partitionTransformation;
}
- public PartitionTransformAction(int id,
+ public PartitionTransformAction(long id,
@NonNull String name,
@NonNull PartitionSeaTunnelTransform partitionTransformation,
@NonNull List<URL> jarUrls) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java
index 8391be830..c95d7785f 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/PhysicalSourceAction.java
@@ -33,7 +33,7 @@ public class PhysicalSourceAction<T, SplitT extends SourceSplit, StateT extends
private final SeaTunnelSource<T, SplitT, StateT> source;
private final List<SeaTunnelTransform> transforms;
- public PhysicalSourceAction(int id,
+ public PhysicalSourceAction(long id,
@NonNull String name,
@NonNull SeaTunnelSource<T, SplitT, StateT> source,
@NonNull List<URL> jarUrls,
@@ -43,7 +43,7 @@ public class PhysicalSourceAction<T, SplitT extends SourceSplit, StateT extends
this.transforms = transforms;
}
- protected PhysicalSourceAction(int id, @NonNull String name, @NonNull List<Action> upstreams,
+ protected PhysicalSourceAction(long id, @NonNull String name, @NonNull List<Action> upstreams,
@NonNull SeaTunnelSource<T, SplitT, StateT> source,
@NonNull List<URL> jarUrls,
List<SeaTunnelTransform> transforms) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
index 47c387dcd..7deed1860 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
@@ -28,7 +28,7 @@ import java.util.List;
public class SinkAction<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends AbstractAction {
private SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
- public SinkAction(int id,
+ public SinkAction(long id,
@NonNull String name,
@NonNull List<Action> upstreams,
@NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
@@ -37,7 +37,7 @@ public class SinkAction<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends
this.sink = sink;
}
- public SinkAction(int id,
+ public SinkAction(long id,
@NonNull String name,
@NonNull SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
@NonNull List<URL> jarUrls) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
index 41b2bcb63..ed4fb1967 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SourceAction.java
@@ -32,7 +32,7 @@ public class SourceAction<T, SplitT extends SourceSplit, StateT extends Serializ
private static final long serialVersionUID = -4104531889750766731L;
private final SeaTunnelSource<T, SplitT, StateT> source;
- public SourceAction(int id,
+ public SourceAction(long id,
@NonNull String name,
@NonNull SeaTunnelSource<T, SplitT, StateT> source,
@NonNull List<URL> jarUrls) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
index c82f41947..5271a2933 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformAction.java
@@ -27,7 +27,7 @@ import java.util.List;
public class TransformAction extends AbstractAction {
private SeaTunnelTransform transform;
- public TransformAction(int id,
+ public TransformAction(long id,
@NonNull String name,
@NonNull List<Action> upstreams,
@NonNull SeaTunnelTransform transform,
@@ -36,7 +36,7 @@ public class TransformAction extends AbstractAction {
this.transform = transform;
}
- public TransformAction(int id,
+ public TransformAction(long id,
@NonNull String name,
@NonNull SeaTunnelTransform transform,
@NonNull List<URL> jarUrls) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
index 339dbc1a0..c18315505 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/TransformChainAction.java
@@ -29,7 +29,7 @@ public class TransformChainAction extends AbstractAction {
private static final long serialVersionUID = -340174711145367535L;
private final List<SeaTunnelTransform> transforms;
- public TransformChainAction(int id,
+ public TransformChainAction(long id,
@NonNull String name,
@NonNull List<Action> upstreams,
@NonNull List<URL> jarUrls,
@@ -38,7 +38,7 @@ public class TransformChainAction extends AbstractAction {
this.transforms = transforms;
}
- public TransformChainAction(int id,
+ public TransformChainAction(long id,
@NonNull String name,
@NonNull List<URL> jarUrls,
@NonNull List<SeaTunnelTransform> transforms) {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
index df8776548..c9673b0ba 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/internal/IntermediateDataQueue.java
@@ -23,17 +23,17 @@ public class IntermediateDataQueue implements Serializable {
private static final long serialVersionUID = -3049265155605303992L;
- private final int id;
+ private final long id;
private final int parallelism;
private final String name;
- public IntermediateDataQueue(int id, String name, int parallelism) {
+ public IntermediateDataQueue(long id, String name, int parallelism) {
this.id = id;
this.name = name;
this.parallelism = parallelism;
}
- public int getId() {
+ public long getId() {
return id;
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
index 69c7e635c..75fdbad67 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
@@ -60,7 +60,7 @@ public class LogicalDag implements IdentifiedDataSerializable {
private static final Logger LOG = LoggerFactory.getLogger(LogicalDag.class);
private JobConfig jobConfig;
private final Set<LogicalEdge> edges = new LinkedHashSet<>();
- private final Map<Integer, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
+ private final Map<Long, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
private IdGenerator idGenerator;
public LogicalDag() {
@@ -84,7 +84,7 @@ public class LogicalDag implements IdentifiedDataSerializable {
return this.edges;
}
- public Map<Integer, LogicalVertex> getLogicalVertexMap() {
+ public Map<Long, LogicalVertex> getLogicalVertexMap() {
return logicalVertexMap;
}
@@ -128,8 +128,8 @@ public class LogicalDag implements IdentifiedDataSerializable {
public void writeData(ObjectDataOutput out) throws IOException {
out.writeInt(logicalVertexMap.size());
- for (Map.Entry<Integer, LogicalVertex> entry : logicalVertexMap.entrySet()) {
- out.writeInt(entry.getKey());
+ for (Map.Entry<Long, LogicalVertex> entry : logicalVertexMap.entrySet()) {
+ out.writeLong(entry.getKey());
out.writeObject(entry.getValue());
}
@@ -148,7 +148,7 @@ public class LogicalDag implements IdentifiedDataSerializable {
int vertexCount = in.readInt();
for (int i = 0; i < vertexCount; i++) {
- Integer key = in.readInt();
+ Long key = in.readLong();
LogicalVertex value = in.readObject();
logicalVertexMap.put(key, value);
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
index 1ea396f7a..a03eb8db3 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
@@ -40,9 +40,9 @@ public class LogicalDagGenerator {
private JobConfig jobConfig;
private IdGenerator idGenerator;
- private Map<Action, Collection<Integer>> alreadyTransformed = new HashMap<>();
+ private Map<Action, Collection<Long>> alreadyTransformed = new HashMap<>();
- private Map<Integer, LogicalVertex> logicalIdVertexMap = new HashMap<>();
+ private Map<Long, LogicalVertex> logicalIdVertexMap = new HashMap<>();
public LogicalDagGenerator(@NonNull List<Action> actions,
@NonNull JobConfig jobConfig,
@@ -63,12 +63,12 @@ public class LogicalDagGenerator {
return logicalDag;
}
- private Collection<Integer> transformAction(Action action) {
+ private Collection<Long> transformAction(Action action) {
if (alreadyTransformed.containsKey(action)) {
return alreadyTransformed.get(action);
}
- Collection<Integer> upstreamVertexIds = new ArrayList<>();
+ Collection<Long> upstreamVertexIds = new ArrayList<>();
List<Action> upstream = action.getUpstream();
if (!CollectionUtils.isEmpty(upstream)) {
for (Action upstreamAction : upstream) {
@@ -79,7 +79,7 @@ public class LogicalDagGenerator {
LogicalVertex logicalVertex =
new LogicalVertex(action.getId(), action, action.getParallelism());
logicalDag.addLogicalVertex(logicalVertex);
- Collection<Integer> transformedActions = Lists.newArrayList(logicalVertex.getVertexId());
+ Collection<Long> transformedActions = Lists.newArrayList(logicalVertex.getVertexId());
alreadyTransformed.put(action, transformedActions);
logicalIdVertexMap.put(logicalVertex.getVertexId(), logicalVertex);
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
index bfa7091d6..e7d906b9a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java
@@ -35,9 +35,9 @@ public class LogicalEdge implements IdentifiedDataSerializable {
private LogicalVertex leftVertex;
private LogicalVertex rightVertex;
- private Integer leftVertexId;
+ private Long leftVertexId;
- private Integer rightVertexId;
+ private Long rightVertexId;
public LogicalEdge(){}
@@ -48,7 +48,7 @@ public class LogicalEdge implements IdentifiedDataSerializable {
this.rightVertexId = rightVertex.getVertexId();
}
- public void recoveryFromVertexMap(@NonNull Map<Integer, LogicalVertex> vertexMap) {
+ public void recoveryFromVertexMap(@NonNull Map<Long, LogicalVertex> vertexMap) {
leftVertex = vertexMap.get(leftVertexId);
rightVertex = vertexMap.get(rightVertexId);
@@ -69,13 +69,13 @@ public class LogicalEdge implements IdentifiedDataSerializable {
@Override
public void writeData(ObjectDataOutput out) throws IOException {
// To prevent circular serialization, we only serialize the ID of vertices for edges
- out.writeInt(leftVertexId);
- out.writeInt(rightVertexId);
+ out.writeLong(leftVertexId);
+ out.writeLong(rightVertexId);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
- leftVertexId = in.readInt();
- rightVertexId = in.readInt();
+ leftVertexId = in.readLong();
+ rightVertexId = in.readLong();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
index c9e145c86..d84b83521 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalVertex.java
@@ -32,7 +32,7 @@ import java.io.IOException;
@AllArgsConstructor
public class LogicalVertex implements IdentifiedDataSerializable {
- private Integer vertexId;
+ private Long vertexId;
private Action action;
private int parallelism;
@@ -51,14 +51,14 @@ public class LogicalVertex implements IdentifiedDataSerializable {
@Override
public void writeData(ObjectDataOutput out) throws IOException {
- out.writeInt(vertexId);
+ out.writeLong(vertexId);
out.writeObject(action);
out.writeInt(parallelism);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
- vertexId = in.readInt();
+ vertexId = in.readLong();
action = in.readObject();
parallelism = in.readInt();
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 26b2e76ff..90b74350d 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -17,8 +17,10 @@
package org.apache.seatunnel.engine.core.job;
+import java.util.concurrent.ExecutionException;
+
public interface Job {
long getJobId();
- void submitJob();
+ void submitJob() throws ExecutionException, InterruptedException;
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java
index df6c2a370..aa4175247 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/PipelineState.java
@@ -73,7 +73,7 @@ public enum PipelineState {
/** Restoring last possible valid state of the pipeline if it has it. */
INITIALIZING;
- public boolean isEnd() {
+ public boolean isEndState() {
return this == FINISHED || this == CANCELED || this == FAILED;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 4159346e4..567f81440 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
import org.apache.seatunnel.engine.server.master.JobMaster;
import com.hazelcast.instance.impl.Node;
@@ -27,6 +28,7 @@ import com.hazelcast.internal.services.MembershipAwareService;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.jet.impl.LiveOperationRegistry;
import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.LiveOperations;
@@ -39,6 +41,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
+ private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);
public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
private NodeEngineImpl nodeEngine;
@@ -114,20 +117,22 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
* call by client to submit job
*/
@SuppressWarnings("checkstyle:MagicNumber")
- public CompletableFuture<Void> submitJob(Data jobImmutableInformation) {
+ public NonCompletableFuture<Void> submitJob(Data jobImmutableInformation) {
CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService);
executorService.submit(() -> {
try {
jobMaster.init();
+ jobMaster.run();
} catch (Throwable e) {
- throw new RuntimeException(e);
+ LOGGER.severe("submit job error: " + e.getMessage());
+ voidCompletableFuture.completeExceptionally(e);
} finally {
// We specify that when init is complete, the submitJob is complete
voidCompletableFuture.complete(null);
}
- jobMaster.run();
+ //jobMaster.run();
});
- return voidCompletableFuture;
+ return new NonCompletableFuture(voidCompletableFuture);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 4d15b319a..b34ebe6ca 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -24,6 +24,7 @@ import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.stream.Collectors.partitioningBy;
import static java.util.stream.Collectors.toList;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.ProgressState;
@@ -34,16 +35,20 @@ import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
+import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
+import com.hazelcast.internal.serialization.Data;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.properties.HazelcastProperties;
import lombok.NonNull;
import lombok.SneakyThrows;
+import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -112,14 +117,24 @@ public class TaskExecutionService {
uncheckRun(startedLatch::await);
}
- public CompletableFuture<TaskExecutionState> submitTaskGroup(
- TaskGroup taskGroup,
- CompletableFuture<Void> cancellationFuture
+ public NonCompletableFuture<TaskExecutionState> deployTask(
+ @NonNull Data taskImmutableInformation
) {
- taskGroup.init();
- Collection<Task> tasks = taskGroup.getTasks();
- final TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker(cancellationFuture, taskGroup);
+ CompletableFuture<TaskExecutionState> resultFuture = new CompletableFuture<>();
+ TaskGroup taskGroup = null;
try {
+ TaskGroupImmutableInformation taskImmutableInfo =
+ nodeEngine.getSerializationService().toObject(taskImmutableInformation);
+ Set<URL> jars = taskImmutableInfo.getJars();
+
+ // TODO Use classloader load the connector jars and deserialize Task
+ taskGroup = nodeEngine.getSerializationService().toData(taskImmutableInfo.getGroup());
+ taskGroup.init();
+ Collection<Task> tasks = taskGroup.getTasks();
+
+ // TODO We need add a method to cancel task
+ CompletableFuture<Void> cancellationFuture = new CompletableFuture<>();
+ TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker(cancellationFuture, taskGroup, resultFuture);
ConcurrentMap<Long, TaskExecutionContext> taskExecutionContextMap = new ConcurrentHashMap<>();
final Map<Boolean, List<Task>> byCooperation =
tasks.stream()
@@ -134,9 +149,10 @@ public class TaskExecutionService {
taskGroup.setTasksContext(taskExecutionContextMap);
executionContexts.put(taskGroup.getId(), new TaskGroupContext(taskGroup));
} catch (Throwable t) {
- executionTracker.future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
+ logger.severe(ExceptionUtils.getMessage(t));
+ resultFuture.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
}
- return new NonCompletableFuture<>(executionTracker.future);
+ return new NonCompletableFuture<>(resultFuture);
}
private final class BlockingWorker implements Runnable {
@@ -158,7 +174,8 @@ public class TaskExecutionService {
ProgressState result;
do {
result = t.call();
- } while (!result.isDone() && !isShutdown && !tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
+ } while (!result.isDone() && !isShutdown &&
+ !tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
} catch (Throwable e) {
logger.warning("Exception in " + t, e);
tracker.taskGroupExecutionTracker.exception(e);
@@ -190,7 +207,8 @@ public class TaskExecutionService {
public LinkedBlockingDeque<TaskTracker> taskqueue;
@SuppressWarnings("checkstyle:MagicNumber")
- public CooperativeTaskWorker(LinkedBlockingDeque<TaskTracker> taskqueue, RunBusWorkSupplier runBusWorkSupplier) {
+ public CooperativeTaskWorker(LinkedBlockingDeque<TaskTracker> taskqueue,
+ RunBusWorkSupplier runBusWorkSupplier) {
logger.info(String.format("Created new BusWork : %s", this.hashCode()));
this.taskqueue = taskqueue;
this.timer = new TaskCallTimer(50, keep, runBusWorkSupplier, this);
@@ -287,7 +305,7 @@ public class TaskExecutionService {
public final class TaskGroupExecutionTracker {
private final TaskGroup taskGroup;
- final CompletableFuture<TaskExecutionState> future = new CompletableFuture<>();
+ final CompletableFuture<TaskExecutionState> future;
volatile List<Future<?>> blockingFutures = emptyList();
private final AtomicInteger completionLatch;
@@ -295,7 +313,9 @@ public class TaskExecutionService {
private final AtomicBoolean isCancel = new AtomicBoolean(false);
- TaskGroupExecutionTracker(CompletableFuture<Void> cancellationFuture, TaskGroup taskGroup) {
+ TaskGroupExecutionTracker(@NonNull CompletableFuture<Void> cancellationFuture, @NonNull TaskGroup taskGroup,
+ @NonNull CompletableFuture<TaskExecutionState> future) {
+ this.future = future;
this.completionLatch = new AtomicInteger(taskGroup.getTasks().size());
this.taskGroup = taskGroup;
cancellationFuture.whenComplete(withTryCatch(logger, (r, e) -> {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
index b1527e120..b46b4ef31 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionEdge.java
@@ -24,9 +24,9 @@ public class ExecutionEdge {
private ExecutionVertex leftVertex;
private ExecutionVertex rightVertex;
- private Integer leftVertexId;
+ private Long leftVertexId;
- private Integer rightVertexId;
+ private Long rightVertexId;
public ExecutionEdge(ExecutionVertex leftVertex, ExecutionVertex rightVertex) {
this.leftVertex = leftVertex;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 2c16829b0..fa3e38052 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -45,10 +45,10 @@ import java.util.stream.Collectors;
public class ExecutionPlanGenerator {
- private final Map<Integer, List<Integer>> edgeMap = new HashMap<>();
- private final Map<Integer, Action> actions = new HashMap<>();
+ private final Map<Long, List<Long>> edgeMap = new HashMap<>();
+ private final Map<Long, Action> actions = new HashMap<>();
- private final Map<Integer, LogicalVertex> logicalVertexes;
+ private final Map<Long, LogicalVertex> logicalVertexes;
private final List<LogicalEdge> logicalEdges;
private final JobImmutableInformation jobImmutableInformation;
@@ -80,7 +80,7 @@ public class ExecutionPlanGenerator {
next = newNext;
}
- Map<Integer, LogicalVertex> vertexes = new HashMap<>();
+ Map<Long, LogicalVertex> vertexes = new HashMap<>();
actions.forEach((key, value) -> vertexes.put(key, new LogicalVertex(key, value,
logicalVertexes.get(key).getParallelism())));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
index f39bb2d8e..f0b282048 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionVertex.java
@@ -25,7 +25,7 @@ import lombok.Data;
@Data
@AllArgsConstructor
public class ExecutionVertex {
- private Integer vertexId;
+ private Long vertexId;
private Action action;
private int parallelism;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
index b7def66ad..c3bf85cb2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
@@ -23,9 +23,9 @@ import java.util.Map;
public class Pipeline {
private final List<ExecutionEdge> edges;
- private final Map<Integer, ExecutionVertex> vertexes;
+ private final Map<Long, ExecutionVertex> vertexes;
- Pipeline(List<ExecutionEdge> edges, Map<Integer, ExecutionVertex> vertexes) {
+ Pipeline(List<ExecutionEdge> edges, Map<Long, ExecutionVertex> vertexes) {
this.edges = edges;
this.vertexes = vertexes;
}
@@ -34,7 +34,7 @@ public class Pipeline {
return edges;
}
- public Map<Integer, ExecutionVertex> getVertexes() {
+ public Map<Long, ExecutionVertex> getVertexes() {
return vertexes;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index 1f8dab913..313c06194 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -34,7 +34,7 @@ public class PipelineGenerator {
// cache in the future
return edgesList.stream().map(e -> {
- Map<Integer, ExecutionVertex> vertexes = new HashMap<>();
+ Map<Long, ExecutionVertex> vertexes = new HashMap<>();
List<ExecutionEdge> pipelineEdges = e.stream().map(edge -> {
if (!vertexes.containsKey(edge.getLeftVertexId())) {
vertexes.put(edge.getLeftVertexId(), edge.getLeftVertex());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index ecd684a3a..ffbc1d6bb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -62,7 +62,7 @@ public class PhysicalPlan {
* in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler} whenComplete method will be called.
*/
private final CompletableFuture<JobStatus> jobEndFuture;
-
+ private final NonCompletableFuture<JobStatus> nonCompletableFuture;
/**
* This future only can completion by the {@link SubPlan } subPlanFuture.
@@ -83,25 +83,38 @@ public class PhysicalPlan {
this.stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
this.jobStatus.set(JobStatus.CREATED);
this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
- this.jobEndFuture = new CompletableFuture<JobStatus>();
+ this.jobEndFuture = new CompletableFuture<>();
+ this.nonCompletableFuture = new NonCompletableFuture<>(jobEndFuture);
this.waitForCompleteBySubPlan = waitForCompleteBySubPlan;
this.pipelineList = pipelineList;
Arrays.stream(this.waitForCompleteBySubPlan).forEach(x -> {
x.whenComplete((v, t) -> {
+ // We need not handle t, Because we will not return t from Pipeline
if (PipelineState.CANCELED.equals(v)) {
canceledPipelineNum.incrementAndGet();
} else if (PipelineState.FAILED.equals(v)) {
- failedPipelineNum.incrementAndGet();
+ LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
+ cancelJob().whenComplete((v1, t1) -> {
+ LOGGER.severe(String.format("Cancel other pipelines complete"));
+ failedPipelineNum.incrementAndGet();
+ });
+ } else if (!PipelineState.FINISHED.equals(v)) {
+ LOGGER.severe(
+ "Pipeline Failed with Unknown PipelineState, Begin to cancel other pipelines in this job.");
+ cancelJob().whenComplete((v1, t1) -> {
+ LOGGER.severe(String.format("Cancel other pipelines complete"));
+ failedPipelineNum.incrementAndGet();
+ });
}
if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
if (failedPipelineNum.get() > 0) {
- jobStatus.set(JobStatus.FAILING);
+ updateJobState(JobStatus.FAILING);
} else if (canceledPipelineNum.get() > 0) {
- jobStatus.set(JobStatus.CANCELED);
+ updateJobState(JobStatus.CANCELED);
} else {
- jobStatus.set(JobStatus.FINISHED);
+ updateJobState(JobStatus.FINISHED);
}
jobEndFuture.complete(jobStatus.get());
}
@@ -109,6 +122,16 @@ public class PhysicalPlan {
});
}
+ public NonCompletableFuture<Void> cancelJob() {
+ CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
+ // TODO Implement cancel pipeline in job.
+ return null;
+ });
+
+ cancelFuture.complete(null);
+ return new NonCompletableFuture<>(cancelFuture);
+ }
+
public List<SubPlan> getPipelineList() {
return pipelineList;
}
@@ -120,7 +143,11 @@ public class PhysicalPlan {
}
}
- public boolean updateJobState(JobStatus current, JobStatus targetState) {
+ public boolean updateJobState(@NonNull JobStatus targetState) {
+ return updateJobState(jobStatus.get(), targetState);
+ }
+
+ public boolean updateJobState(@NonNull JobStatus current, @NonNull JobStatus targetState) {
// consistency check
if (current.isEndState()) {
String message = "Job is trying to leave terminal state " + current;
@@ -144,7 +171,15 @@ public class PhysicalPlan {
}
}
- public CompletableFuture<JobStatus> getJobEndCompletableFuture() {
- return this.jobEndFuture;
+ public NonCompletableFuture<JobStatus> getJobEndCompletableFuture() {
+ return this.nonCompletableFuture;
+ }
+
+ public JobImmutableInformation getJobImmutableInformation() {
+ return jobImmutableInformation;
+ }
+
+ public JobStatus getJobStatus() {
+ return jobStatus.get();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 056faa0d7..a13a0d17f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -122,7 +122,8 @@ public class PhysicalPlanGenerator {
coordinatorVertexList,
pipelineFuture,
waitForCompleteByPhysicalVertexList.toArray(
- new NonCompletableFuture[waitForCompleteByPhysicalVertexList.size()]));
+ new NonCompletableFuture[waitForCompleteByPhysicalVertexList.size()]),
+ jobImmutableInformation);
});
PhysicalPlan physicalPlan = new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
@@ -153,9 +154,10 @@ public class PhysicalPlanGenerator {
new SinkAggregatedCommitterTask(idGenerator.getNextId(), s);
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
- waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+ waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture(taskFuture));
- return new PhysicalVertex(atomicInteger.incrementAndGet(),
+ return new PhysicalVertex(idGenerator.getNextId(),
+ atomicInteger.incrementAndGet(),
executorService,
collect.size(),
new TaskGroupDefaultImpl("SinkAggregatedCommitterTask", Lists.newArrayList(t)),
@@ -163,7 +165,10 @@ public class PhysicalPlanGenerator {
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- null);
+ null,
+ jobImmutableInformation,
+ initializationTimestamp,
+ nodeEngine);
}).collect(Collectors.toList());
}
@@ -182,7 +187,8 @@ public class PhysicalPlanGenerator {
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
- t.add(new PhysicalVertex(i,
+ t.add(new PhysicalVertex(idGenerator.getNextId(),
+ i,
executorService,
flow.getAction().getParallelism(),
new TaskGroupDefaultImpl("PartitionTransformTask", Lists.newArrayList(seaTunnelTask)),
@@ -190,7 +196,10 @@ public class PhysicalPlanGenerator {
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- seaTunnelTask.getJarsUrl()));
+ seaTunnelTask.getJarsUrl(),
+ jobImmutableInformation,
+ initializationTimestamp,
+ nodeEngine));
}
return t.stream();
}).collect(Collectors.toList());
@@ -207,7 +216,8 @@ public class PhysicalPlanGenerator {
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
- return new PhysicalVertex(atomicInteger.incrementAndGet(),
+ return new PhysicalVertex(idGenerator.getNextId(),
+ atomicInteger.incrementAndGet(),
executorService,
sources.size(),
new TaskGroupDefaultImpl(s.getName(), Lists.newArrayList(t)),
@@ -215,7 +225,10 @@ public class PhysicalPlanGenerator {
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- t.getJarsUrl());
+ t.getJarsUrl(),
+ jobImmutableInformation,
+ initializationTimestamp,
+ nodeEngine);
}).collect(Collectors.toList());
}
@@ -243,7 +256,8 @@ public class PhysicalPlanGenerator {
waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
// TODO We need give every task a appropriate name
- t.add(new PhysicalVertex(i,
+ t.add(new PhysicalVertex(idGenerator.getNextId(),
+ i,
executorService,
flow.getAction().getParallelism(),
new TaskGroupDefaultImpl("SourceTask",
@@ -252,7 +266,10 @@ public class PhysicalPlanGenerator {
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- jars));
+ jars,
+ jobImmutableInformation,
+ initializationTimestamp,
+ nodeEngine));
}
return t.stream();
}).collect(Collectors.toList());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 879226c82..2751ce8ff 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -17,22 +17,26 @@
package org.apache.seatunnel.engine.server.dag.physical;
-import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import com.hazelcast.cluster.Address;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngine;
import lombok.NonNull;
import java.net.URL;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
/**
* PhysicalVertex is responsible for the scheduling and execution of a single task parallel
@@ -43,12 +47,14 @@ public class PhysicalVertex {
private static final ILogger LOGGER = Logger.getLogger(PhysicalVertex.class);
+ private final long physicalVertexId;
+
/**
* the index of PhysicalVertex
*/
private final int subTaskGroupIndex;
- private final String taskNameWithSubtaskAndPipeline;
+ private final String taskFullName;
private final int parallelism;
@@ -64,19 +70,35 @@ public class PhysicalVertex {
private final Set<URL> pluginJarsUrls;
+ private AtomicReference<ExecutionState> executionState = new AtomicReference<>();
+
/**
* When PhysicalVertex status turn to end, complete this future. And then the waitForCompleteByPhysicalVertex
* in {@link SubPlan} whenComplete method will be called.
*/
private final CompletableFuture<TaskExecutionState> taskFuture;
+ /**
+ * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
+ * task transitioned into a certain state. The index into this array is the ordinal
+ * of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is at {@code
+ * stateTimestamps[RUNNING.ordinal()]}.
+ */
+ private final long[] stateTimestamps;
/**
* This future only can completion by the task run in {@link com.hazelcast.spi.impl.executionservice.ExecutionService }
*/
private NonCompletableFuture<TaskExecutionState> waitForCompleteByExecutionService;
- public PhysicalVertex(int subTaskGroupIndex,
+ private final JobImmutableInformation jobImmutableInformation;
+
+ private final long initializationTimestamp;
+
+ private final NodeEngine nodeEngine;
+
+ public PhysicalVertex(long physicalVertexId,
+ int subTaskGroupIndex,
@NonNull ExecutorService executorService,
int parallelism,
@NonNull TaskGroupDefaultImpl taskGroup,
@@ -84,7 +106,11 @@ public class PhysicalVertex {
@NonNull FlakeIdGenerator flakeIdGenerator,
int pipelineIndex,
int totalPipelineNum,
- Set<URL> pluginJarsUrls) {
+ Set<URL> pluginJarsUrls,
+ @NonNull JobImmutableInformation jobImmutableInformation,
+ long initializationTimestamp,
+ @NonNull NodeEngine nodeEngine) {
+ this.physicalVertexId = physicalVertexId;
this.subTaskGroupIndex = subTaskGroupIndex;
this.executorService = executorService;
this.parallelism = parallelism;
@@ -93,45 +119,132 @@ public class PhysicalVertex {
this.pipelineIndex = pipelineIndex;
this.totalPipelineNum = totalPipelineNum;
this.pluginJarsUrls = pluginJarsUrls;
- this.taskNameWithSubtaskAndPipeline =
+ this.jobImmutableInformation = jobImmutableInformation;
+ this.initializationTimestamp = initializationTimestamp;
+ stateTimestamps = new long[ExecutionState.values().length];
+ this.stateTimestamps[ExecutionState.INITIALIZING.ordinal()] = initializationTimestamp;
+ this.executionState.set(ExecutionState.CREATED);
+ this.stateTimestamps[ExecutionState.CREATED.ordinal()] = System.currentTimeMillis();
+ this.nodeEngine = nodeEngine;
+ this.taskFullName =
String.format(
- "task: [%s (%d/%d)], pipeline: [%d/%d]",
+ "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]",
+ jobImmutableInformation.getJobConfig().getName(),
+ jobImmutableInformation.getJobId(),
+ pipelineIndex + 1,
+ totalPipelineNum,
taskGroup.getTaskGroupName(),
subTaskGroupIndex + 1,
- parallelism,
- pipelineIndex,
- totalPipelineNum);
+ parallelism);
this.taskFuture = taskFuture;
}
@SuppressWarnings("checkstyle:MagicNumber")
- public void deploy() throws JobException {
-
- // TODO really submit job to ExecutionService and get a NonCompletableFuture<ExecutionState>
- long executionId = flakeIdGenerator.newId();
- CompletableFuture<TaskExecutionState> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ // This method must not throw an exception
+ public void deploy(@NonNull Address address) {
+ /**
+ TaskGroupImmutableInformation taskGroupImmutableInformation =
+ new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
+ nodeEngine.getSerializationService().toData(this.taskGroup),
+ this.pluginJarsUrls);
+
+ try {
+ waitForCompleteByExecutionService = new NonCompletableFuture<>(
+ nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+ new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+ address)
+ .invoke());
+ } catch (Throwable th) {
+ LOGGER.severe(String.format("%s deploy error with Exception: %s",
+ this.taskFullName,
+ ExceptionUtils.getMessage(th)));
+ updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED);
+ taskFuture.complete(
+ new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, null));
+ }*/
+
+ waitForCompleteByExecutionService = new NonCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
try {
- Thread.sleep(5000);
- return new TaskExecutionState(executionId, ExecutionState.FINISHED, null);
+ Thread.sleep(2000);
} catch (InterruptedException e) {
- return new TaskExecutionState(executionId, ExecutionState.FAILED, e);
+ throw new RuntimeException(e);
}
- }, executorService);
+ return new TaskExecutionState(flakeIdGenerator.newId(), ExecutionState.FINISHED, null);
+ }));
- waitForCompleteByExecutionService = new NonCompletableFuture<TaskExecutionState>(uCompletableFuture);
+ updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
waitForCompleteByExecutionService.whenComplete((v, t) -> {
- if (t != null) {
- // TODO t.getMessage() need be replace
- LOGGER.info(String.format("The Task %s Failed with Exception: %s",
- this.taskNameWithSubtaskAndPipeline,
- t.getMessage()));
- taskFuture.complete(new TaskExecutionState(executionId, ExecutionState.FAILED, t));
- } else {
- LOGGER.info(String.format("The Task %s end with state %s",
- this.taskNameWithSubtaskAndPipeline,
- v));
+ try {
+ // We need not handle t, Because we will not return t from TaskExecutionService
+ // v will never be null
+ updateTaskState(executionState.get(), v.getExecutionState());
+ if (v.getThrowable() != null) {
+ LOGGER.severe(String.format("%s end with state %s and Exception: %s",
+ this.taskFullName,
+ v.getExecutionState(),
+ ExceptionUtils.getMessage(v.getThrowable())));
+ } else {
+ LOGGER.severe(String.format("%s end with state %s",
+ this.taskFullName,
+ v.getExecutionState()));
+ }
+ taskFuture.complete(v);
+ } catch (Throwable th) {
+ LOGGER.severe(
+ String.format("%s end with Exception: %s", this.taskFullName, ExceptionUtils.getMessage(th)));
+ updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED);
+ v = new TaskExecutionState(v.getTaskExecutionId(), ExecutionState.FAILED, null);
taskFuture.complete(v);
}
});
}
+
+ public long getPhysicalVertexId() {
+ return physicalVertexId;
+ }
+
+ public boolean updateTaskState(@NonNull ExecutionState current, @NonNull ExecutionState targetState) {
+ // consistency check
+ if (current.isEndState()) {
+ String message = "Task is trying to leave terminal state " + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ if (ExecutionState.SCHEDULED.equals(targetState) && !ExecutionState.CREATED.equals(current)) {
+ String message = "Only [CREATED] task can turn to [SCHEDULED]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ if (ExecutionState.DEPLOYING.equals(targetState) && !ExecutionState.SCHEDULED.equals(current)) {
+ String message = "Only [SCHEDULED] task can turn to [DEPLOYING]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ if (ExecutionState.RUNNING.equals(targetState) && !ExecutionState.DEPLOYING.equals(current)) {
+ String message = "Only [DEPLOYING] task can turn to [RUNNING]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ // now do the actual state transition
+ if (executionState.get() == current) {
+ executionState.set(targetState);
+ LOGGER.info(String.format("%s turn from state %s to %s.",
+ taskFullName,
+ current,
+ targetState));
+
+ stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public TaskGroupDefaultImpl getTaskGroup() {
+ return taskGroup;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 8c1c4b0b6..4c2ba9dfc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.engine.server.dag.physical;
-import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineState;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
@@ -44,6 +44,8 @@ public class SubPlan {
private final int totalPipelineNum;
+ private final JobImmutableInformation jobImmutableInformation;
+
private AtomicInteger finishedTaskNum = new AtomicInteger(0);
private AtomicInteger canceledTaskNum = new AtomicInteger(0);
@@ -52,7 +54,7 @@ public class SubPlan {
private AtomicReference<PipelineState> pipelineState = new AtomicReference<>();
- private final String pipelineNameWithIndex;
+ private final String pipelineFullName;
/**
* Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when the
@@ -80,7 +82,8 @@ public class SubPlan {
@NonNull List<PhysicalVertex> physicalVertexList,
@NonNull List<PhysicalVertex> coordinatorVertexList,
@NonNull CompletableFuture<PipelineState> pipelineFuture,
- @NonNull NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex) {
+ @NonNull NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex,
+ @NonNull JobImmutableInformation jobImmutableInformation) {
this.pipelineIndex = pipelineIndex;
this.pipelineFuture = pipelineFuture;
this.totalPipelineNum = totalPipelineNum;
@@ -91,31 +94,44 @@ public class SubPlan {
this.stateTimestamps[PipelineState.INITIALIZING.ordinal()] = initializationTimestamp;
this.pipelineState.set(PipelineState.CREATED);
this.stateTimestamps[PipelineState.CREATED.ordinal()] = System.currentTimeMillis();
- this.pipelineNameWithIndex = String.format(
- "pipeline: [(%d/%d)]",
+ this.jobImmutableInformation = jobImmutableInformation;
+ this.pipelineFullName = String.format(
+ "Job %s (%s), Pipeline: [(%d/%d)]",
+ jobImmutableInformation.getJobConfig().getName(),
+ jobImmutableInformation.getJobId(),
pipelineIndex + 1,
totalPipelineNum);
Arrays.stream(this.waitForCompleteByPhysicalVertex).forEach(x -> {
x.whenComplete((v, t) -> {
- if (ExecutionState.CANCELED.equals(v)) {
+ // We need not handle t, Because we will not return t from PhysicalVertex
+ if (ExecutionState.CANCELED.equals(v.getExecutionState())) {
canceledTaskNum.incrementAndGet();
- } else if (ExecutionState.FAILED.equals(v)) {
- failedTaskNum.incrementAndGet();
- } else {
- throw new JobException("Unknown Task end state [" + v + "]");
+ } else if (ExecutionState.FAILED.equals(v.getExecutionState())) {
+ LOGGER.severe("Task Failed, Begin to cancel other tasks in this pipeline.");
+ cancelPipeline().whenComplete((v1, t1) -> {
+ LOGGER.severe(String.format("Cancel other tasks complete"));
+ failedTaskNum.incrementAndGet();
+ });
+ } else if (!ExecutionState.FINISHED.equals(v.getExecutionState())) {
+ LOGGER.severe(
+ "Task Failed with Unknown ExecutionState, Begin to cancel other tasks in this pipeline.");
+ cancelPipeline().whenComplete((v1, t1) -> {
+ LOGGER.severe(String.format("Cancel other tasks complete"));
+ failedTaskNum.incrementAndGet();
+ });
}
if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
if (failedTaskNum.get() > 0) {
- LOGGER.info(String.format("Pipeline failed %s", this.pipelineNameWithIndex));
- pipelineState.set(PipelineState.FAILED);
+ updatePipelineState(PipelineState.FAILED);
+ LOGGER.info(String.format("%s end with state FAILED", this.pipelineFullName));
} else if (canceledTaskNum.get() > 0) {
- LOGGER.info(String.format("Pipeline canceled %s", this.pipelineNameWithIndex));
- pipelineState.set(PipelineState.CANCELED);
+ updatePipelineState(PipelineState.CANCELED);
+ LOGGER.info(String.format("%s end with state CANCELED", this.pipelineFullName));
} else {
- LOGGER.info(String.format("Pipeline finished %s", this.pipelineNameWithIndex));
- pipelineState.set(PipelineState.FINISHED);
+ updatePipelineState(PipelineState.FINISHED);
+ LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
}
pipelineFuture.complete(pipelineState.get());
}
@@ -123,6 +139,67 @@ public class SubPlan {
});
}
+ public boolean updatePipelineState(@NonNull PipelineState targetState) {
+ return updatePipelineState(pipelineState.get(), targetState);
+ }
+
+ public boolean updatePipelineState(@NonNull PipelineState current, @NonNull PipelineState targetState) {
+ // consistency check
+ if (current.isEndState()) {
+ String message = "Pipeline is trying to leave terminal state " + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ if (PipelineState.SCHEDULED.equals(targetState) && !PipelineState.CREATED.equals(current)) {
+ String message = "Only [CREATED] pipeline can turn to [SCHEDULED]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ if (PipelineState.DEPLOYING.equals(targetState) && !PipelineState.SCHEDULED.equals(current)) {
+ String message = "Only [SCHEDULED] pipeline can turn to [DEPLOYING]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ if (PipelineState.RUNNING.equals(targetState) && !PipelineState.DEPLOYING.equals(current)) {
+ String message = "Only [DEPLOYING] pipeline can turn to [RUNNING]" + current;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ // now do the actual state transition
+ if (pipelineState.get() == current) {
+ pipelineState.set(targetState);
+ LOGGER.info(String.format("%s turn from state %s to %s.",
+ pipelineFullName,
+ current,
+ targetState));
+
+ stateTimestamps[targetState.ordinal()] = System.currentTimeMillis();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public NonCompletableFuture<Void> cancelPipeline() {
+ CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
+ // TODO Implement cancel tasks in pipeline.
+ return null;
+ });
+
+ cancelFuture.complete(null);
+ return new NonCompletableFuture<>(cancelFuture);
+ }
+
+ public void failedWithNoEnoughResource() {
+ LOGGER.severe(String.format("%s failed with have no enough resource to run.", this.getPipelineFullName()));
+ updatePipelineState(PipelineState.SCHEDULED, PipelineState.FAILED);
+ pipelineFuture.complete(PipelineState.FAILED);
+ }
+
public int getPipelineIndex() {
return pipelineIndex;
}
@@ -134,4 +211,8 @@ public class SubPlan {
public List<PhysicalVertex> getCoordinatorVertexList() {
return coordinatorVertexList;
}
+
+ public String getPipelineFullName() {
+ return pipelineFullName;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
index b2db98f18..b31d5bea3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
@@ -23,17 +23,15 @@ package org.apache.seatunnel.engine.server.execution;
* the state {@code CREATED} and switch states according to this diagram:
*
* <pre>{@code
- * CREATED -> SCHEDULED -> DEPLOYING -> INITIALIZING -> RUNNING -> FINISHED
- * | | | | |
- * | | | +-----+--------------+
- * | | V V
- * | | CANCELLING -----+----> CANCELED
- * | | |
- * | +-------------------------+
- * |
- * | ... -> FAILED
- * V
- * RECONCILING -> INITIALIZING | RUNNING | FINISHED | CANCELED | FAILED
+ * INITIALIZING -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
+ * | | | |
+ * | | +-----+--------------+
+ * | V V
+ * | CANCELLING -----+----> CANCELED
+ * | |
+ * +-------------------------+
+ *
+ * ... -> FAILED
*
* }</pre>
*
@@ -73,7 +71,7 @@ public enum ExecutionState {
/** Restoring last possible valid state of the task if it has it. */
INITIALIZING;
- public boolean isEnd() {
+ public boolean isEndState() {
return this == FINISHED || this == CANCELED || this == FAILED;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 6c9d89517..cfb3352bc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -17,11 +17,18 @@
package org.apache.seatunnel.engine.server.master;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.SimpleResourceManager;
+import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
+import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
@@ -30,6 +37,7 @@ import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.NonNull;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
public class JobMaster implements Runnable {
@@ -45,6 +53,10 @@ public class JobMaster implements Runnable {
private FlakeIdGenerator flakeIdGenerator;
+ private ResourceManager resourceManager;
+
+ private CompletableFuture<JobStatus> jobMasterCompleteFuture = new CompletableFuture<>();
+
public JobMaster(@NonNull Data jobImmutableInformation,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService) {
@@ -53,6 +65,8 @@ public class JobMaster implements Runnable {
this.executorService = executorService;
flakeIdGenerator =
this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
+
+ this.resourceManager = new SimpleResourceManager();
}
public void init() throws Exception {
@@ -61,23 +75,49 @@ public class JobMaster implements Runnable {
LOGGER.info("Job [" + jobInformation.getJobId() + "] jar urls " + jobInformation.getPluginJarsUrls());
// TODO Use classloader load the connector jars and deserialize logicalDag
- this.logicalDag = new LogicalDag();
+ this.logicalDag = nodeEngine.getSerializationService().toObject(jobInformation.getLogicalDag());
physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag,
- nodeEngine,
- jobInformation,
- System.currentTimeMillis(),
- executorService,
- flakeIdGenerator);
+ nodeEngine,
+ jobInformation,
+ System.currentTimeMillis(),
+ executorService,
+ flakeIdGenerator);
}
@SuppressWarnings("checkstyle:MagicNumber")
@Override
public void run() {
try {
- LOGGER.info("I will sleep 2000ms");
- Thread.sleep(2000);
- } catch (Exception e) {
- throw new RuntimeException(e);
+ NonCompletableFuture<JobStatus> jobStatusNonCompletableFuture = physicalPlan.getJobEndCompletableFuture();
+
+ jobStatusNonCompletableFuture.whenComplete((v, t) -> {
+ // We need not handle t, Because we will not return t from physicalPlan
+ if (JobStatus.FAILING.equals(v)) {
+ cleanJob();
+ physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED);
+ }
+ jobMasterCompleteFuture.complete(physicalPlan.getJobStatus());
+ });
+
+ JobScheduler jobScheduler = new PipelineBaseScheduler(physicalPlan, this);
+ jobScheduler.startScheduling();
+ } catch (Throwable e) {
+ LOGGER.severe(String.format("Job %s (%s) run error with: %s",
+ physicalPlan.getJobImmutableInformation().getJobConfig().getName(),
+ physicalPlan.getJobImmutableInformation().getJobId(),
+ ExceptionUtils.getMessage(e)));
+ // try to cancel job
+ physicalPlan.cancelJob();
+ } finally {
+ jobMasterCompleteFuture.join();
}
}
+
+ public void cleanJob() {
+ // TODO clean something
+ }
+
+ public ResourceManager getResourceManager() {
+ return resourceManager;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
index 58227776e..543197559 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
@@ -22,17 +22,16 @@ import static com.hazelcast.jet.impl.util.ExceptionUtil.stackTraceToString;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
import static com.hazelcast.spi.impl.operationservice.ExceptionAction.THROW_EXCEPTION;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
-import com.hazelcast.jet.JetException;
import com.hazelcast.nio.serialization.HazelcastSerializationException;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.impl.operationservice.ExceptionAction;
import com.hazelcast.spi.impl.operationservice.Operation;
-import java.util.concurrent.CompletableFuture;
-
/**
* Base class for async operations. Handles registration/deregistration of
* operations from live registry, exception handling and peeling and
@@ -48,7 +47,7 @@ public abstract class AsyncOperation extends Operation implements IdentifiedData
@Override
public final void run() {
- CompletableFuture<?> future;
+ NonCompletableFuture<?> future;
try {
future = doRun();
} catch (Exception e) {
@@ -59,7 +58,7 @@ public abstract class AsyncOperation extends Operation implements IdentifiedData
future.whenComplete(withTryCatch(getLogger(), (r, f) -> doSendResponse(f != null ? peel(f) : r)));
}
- protected abstract CompletableFuture<?> doRun() throws Exception;
+ protected abstract NonCompletableFuture<?> doRun() throws Exception;
@Override
public final boolean returnsResponse() {
@@ -87,7 +86,7 @@ public abstract class AsyncOperation extends Operation implements IdentifiedData
// the response will not be sent and the operation will hang.
// To prevent this from happening, replace the exception with
// another exception that can be serialized.
- sendResponse(new JetException(stackTraceToString(ex)));
+ sendResponse(new SeaTunnelEngineException(stackTraceToString(ex)));
} else {
throw e;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
index 689f697be..8015895a4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
@@ -17,48 +17,50 @@
package org.apache.seatunnel.engine.server.operation;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
+import lombok.NonNull;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-public class SubmitJobOperation extends AsyncOperation {
- private Data jobImmutableInformation;
+public class DeployTaskOperation extends AsyncOperation {
+ private Data taskImmutableInformation;
- public SubmitJobOperation() {
+ public DeployTaskOperation() {
}
- public SubmitJobOperation(Data jobImmutableInformation) {
- this.jobImmutableInformation = jobImmutableInformation;
+ public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
+ this.taskImmutableInformation = taskImmutableInformation;
+ }
+
+ @Override
+ protected NonCompletableFuture<?> doRun() throws Exception {
+ TaskExecutionService taskExecutionService = getService();
+ NonCompletableFuture<TaskExecutionState> voidCompletableFuture = taskExecutionService.deployTask(taskImmutableInformation);
+ return voidCompletableFuture;
}
@Override
public int getClassId() {
- return OperationDataSerializerHook.SUBMIT_OPERATOR;
+ return OperationDataSerializerHook.DEPLOY_TASK_OPERATOR;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- IOUtil.writeData(out, jobImmutableInformation);
+ IOUtil.writeData(out, taskImmutableInformation);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- jobImmutableInformation = IOUtil.readData(in);
- }
-
- @Override
- protected CompletableFuture<?> doRun() throws Exception {
- SeaTunnelServer seaTunnelServer = getService();
- CompletableFuture<Void> voidCompletableFuture = seaTunnelServer.submitJob(jobImmutableInformation);
- return voidCompletableFuture;
+ taskImmutableInformation = IOUtil.readData(in);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
index 689f697be..419712111 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.operation;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
@@ -24,9 +25,9 @@ import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
+import lombok.NonNull;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
public class SubmitJobOperation extends AsyncOperation {
private Data jobImmutableInformation;
@@ -34,7 +35,7 @@ public class SubmitJobOperation extends AsyncOperation {
public SubmitJobOperation() {
}
- public SubmitJobOperation(Data jobImmutableInformation) {
+ public SubmitJobOperation(@NonNull Data jobImmutableInformation) {
this.jobImmutableInformation = jobImmutableInformation;
}
@@ -56,9 +57,9 @@ public class SubmitJobOperation extends AsyncOperation {
}
@Override
- protected CompletableFuture<?> doRun() throws Exception {
+ protected NonCompletableFuture<?> doRun() throws Exception {
SeaTunnelServer seaTunnelServer = getService();
- CompletableFuture<Void> voidCompletableFuture = seaTunnelServer.submitJob(jobImmutableInformation);
+ NonCompletableFuture<Void> voidCompletableFuture = seaTunnelServer.submitJob(jobImmutableInformation);
return voidCompletableFuture;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
similarity index 74%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index 8fceed6c9..1ff9e4b76 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -15,10 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.scheduler;
+package org.apache.seatunnel.engine.server.resourcemanager;
-public interface JobScheduler {
- void startScheduling();
+import com.hazelcast.cluster.Address;
+import lombok.NonNull;
- boolean updateExecutionState();
+public interface ResourceManager {
+ Address applyForResource(Long jobId, Long taskId);
+
+ @NonNull
+ Address getAppliedResource(Long jobId, Long taskId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
new file mode 100644
index 000000000..209f3fb69
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/SimpleResourceManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.exception.JobException;
+
+import com.hazelcast.cluster.Address;
+import lombok.Data;
+import lombok.NonNull;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+@Data
+public class SimpleResourceManager implements ResourceManager {
+
+ // TODO We may need more detailed resource define, instead of the resource definition method of only Address.
+ private Map<Long, Map<Long, Address>> physicalVertexIdAndResourceMap = new HashMap<>();
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ public Address applyForResource(@NonNull Long jobId, @NonNull Long taskId) {
+ try {
+ Map<Long, Address> jobAddressMap = physicalVertexIdAndResourceMap.get(jobId);
+ if (jobAddressMap == null) {
+ jobAddressMap = new HashMap<>();
+ physicalVertexIdAndResourceMap.put(jobId, jobAddressMap);
+ }
+
+ Address localhost =
+ jobAddressMap.putIfAbsent(taskId, new Address("localhost", 5801));
+
+ if (null == localhost) {
+ localhost = jobAddressMap.get(taskId);
+ }
+
+ return localhost;
+
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ @NonNull
+ public Address getAppliedResource(@NonNull Long jobId, @NonNull Long taskId) {
+ Map<Long, Address> longAddressMap = physicalVertexIdAndResourceMap.get(jobId);
+ if (null == longAddressMap || longAddressMap.isEmpty()) {
+ throw new JobException(
+ String.format("Job %s, Task %s can not found applied resource.", jobId, taskId));
+ }
+
+ return longAddressMap.get(taskId);
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
index 8fceed6c9..bbe003e39 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/JobScheduler.java
@@ -19,6 +19,4 @@ package org.apache.seatunnel.engine.server.scheduler;
public interface JobScheduler {
void startScheduling();
-
- boolean updateExecutionState();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
new file mode 100644
index 000000000..5d608a62a
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.scheduler;
+
+import org.apache.seatunnel.engine.common.exception.JobNoEnoughResourceException;
+import org.apache.seatunnel.engine.core.job.PipelineState;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.NonNull;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+public class PipelineBaseScheduler implements JobScheduler {
+ private static final ILogger LOGGER = Logger.getLogger(PipelineBaseScheduler.class);
+ private final PhysicalPlan physicalPlan;
+ private final JobMaster jobMaster;
+ private final ResourceManager resourceManager;
+
+ public PipelineBaseScheduler(@NonNull PhysicalPlan physicalPlan, @NonNull JobMaster jobMaster) {
+ this.physicalPlan = physicalPlan;
+ this.jobMaster = jobMaster;
+ this.resourceManager = jobMaster.getResourceManager();
+ }
+
+ @Override
+ public void startScheduling() {
+ physicalPlan.turnToRunning();
+ physicalPlan.getPipelineList().forEach(pipeline -> {
+ pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED);
+ if (applyResourceForPipeline(pipeline)) {
+ // deploy pipeline
+ deployPipeline(pipeline);
+ } else {
+ pipeline.failedWithNoEnoughResource();
+ }
+ });
+ }
+
+ private boolean applyResourceForPipeline(@NonNull SubPlan subPlan) {
+ try {
+ // apply resource for coordinators
+ subPlan.getCoordinatorVertexList().forEach(coordinator -> {
+ // TODO If there is no enough resources for tasks, we need add some wait profile
+ coordinator.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+ resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
+ coordinator.getTaskGroup().getId());
+ });
+
+ // apply resource for other tasks
+ subPlan.getPhysicalVertexList().forEach(task -> {
+ task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+ resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
+ task.getTaskGroup().getId());
+ });
+ } catch (JobNoEnoughResourceException e) {
+ LOGGER.severe(e);
+ return false;
+ }
+
+ return true;
+ }
+
+ private void deployPipeline(@NonNull SubPlan pipeline) {
+ pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING);
+ List<CompletableFuture> deployCoordinatorFuture =
+ pipeline.getCoordinatorVertexList().stream().map(coordinator -> {
+ if (coordinator.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
+ // deploy is a time-consuming operation, so we do it async
+ return CompletableFuture.supplyAsync(() -> {
+ coordinator.deploy(
+ resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
+ coordinator.getTaskGroup().getId()));
+ return null;
+ });
+ }
+ return null;
+ }).filter(x -> x != null).collect(Collectors.toList());
+
+ List<CompletableFuture> deployTaskFuture =
+ pipeline.getPhysicalVertexList().stream().map(task -> {
+ if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
+ return CompletableFuture.supplyAsync(() -> {
+ task.deploy(
+ resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
+ task.getTaskGroup().getId()));
+ return null;
+ });
+ }
+ return null;
+ }).filter(x -> x != null).collect(Collectors.toList());
+
+ deployCoordinatorFuture.addAll(deployTaskFuture);
+ CompletableFuture.allOf(deployCoordinatorFuture.toArray(new CompletableFuture[deployCoordinatorFuture.size()]))
+ .whenComplete((v, t) -> {
+ pipeline.updatePipelineState(PipelineState.DEPLOYING, PipelineState.RUNNING);
+ });
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 3153413d4..07db59e4e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.serializable;
import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
@@ -36,6 +37,7 @@ import com.hazelcast.spi.annotation.PrivateApi;
public final class OperationDataSerializerHook implements DataSerializerHook {
public static final int PRINT_MESSAGE_OPERATOR = 0;
public static final int SUBMIT_OPERATOR = 1;
+ public static final int DEPLOY_TASK_OPERATOR = 2;
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -61,6 +63,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
return new PrintMessageOperation();
case SUBMIT_OPERATOR:
return new SubmitJobOperation();
+ case DEPLOY_TASK_OPERATOR:
+ return new DeployTaskOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index a59721b78..2e09b137b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.engine.server.serializable;
import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
-import org.apache.seatunnel.engine.server.task.TaskGroupInfo;
+import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.AssignSplitOperation;
import org.apache.seatunnel.engine.server.task.operation.RegisterOperation;
import org.apache.seatunnel.engine.server.task.operation.RequestSplitOperation;
@@ -64,7 +64,7 @@ public class TaskDataSerializerHook implements DataSerializerHook {
case ASSIGN_SPLIT_TYPE:
return new AssignSplitOperation();
case TASK_GROUP_INFO_TYPE:
- return new TaskGroupInfo();
+ return new TaskGroupImmutableInformation();
default:
return null;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
similarity index 84%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
index e05ddcf9b..cc1f07852 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupInfo.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
@@ -24,31 +24,23 @@ import com.hazelcast.internal.serialization.Data;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.AllArgsConstructor;
import java.io.IOException;
import java.net.URL;
import java.util.Set;
-public class TaskGroupInfo implements IdentifiedDataSerializable {
+@lombok.Data
+@AllArgsConstructor
+public class TaskGroupImmutableInformation implements IdentifiedDataSerializable {
+ // Each deployment generates a new executionId
+ private long executionId;
private Data group;
private Set<URL> jars;
- public TaskGroupInfo() {
- }
-
- public TaskGroupInfo(Data group, Set<URL> jars) {
- this.group = group;
- this.jars = jars;
- }
-
- public Data getGroup() {
- return group;
- }
-
- public Set<URL> getJars() {
- return jars;
+ public TaskGroupImmutableInformation(){
}
@Override
@@ -63,12 +55,14 @@ public class TaskGroupInfo implements IdentifiedDataSerializable {
@Override
public void writeData(ObjectDataOutput out) throws IOException {
+ out.writeLong(executionId);
out.writeObject(jars);
IOUtil.writeData(out, group);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
+ executionId = in.readLong();
jars = in.readObject();
group = IOUtil.readData(in);
}