You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/19 03:53:24 UTC
[incubator-seatunnel] branch st-engine updated: [ST-Engine] Add wait for job complete feture (#2413)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 8d32d1799 [ST-Engine] Add wait for job complete feture (#2413)
8d32d1799 is described below
commit 8d32d179981cc8de87af981ade28558cd44aeca5
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Aug 19 11:53:17 2022 +0800
[ST-Engine] Add wait for job complete feture (#2413)
* Add SeaTunnel Engine ConfigProvider and seatunnel-seatunnel-starter
* add source file to licenserc.yaml
* fix checkstyle
* fix error
* tmp
* tmp
* Update PhysicalPlan to support scheduler by pipeline
* remove init in run
* tmp
* complete scheduler
* optimize scheduler
* fix checkstyle
* fix checkstyle
* Add wait for job complete in JobProxy
* Add wait for job complete in JobProxy
* add license header
fix style
optimize code
* fir merge error
* fix test error
* fix ci error
optimize ci
* fix code conflicts
* fix review error
---
.github/workflows/backend.yml | 2 +
.github/workflows/engine_backend.yml | 2 +
.licenserc.yaml | 1 +
LICENSE | 2 +
.../core/starter/seatunnel/SeaTunnelStarter.java | 10 +-
.../seatunnel/engine/client/SeaTunnelClient.java | 2 +
.../engine/client/SeaTunnelClientInstance.java | 2 +
.../engine/client/SeaTunnelHazelcastClient.java | 51 ++++-
.../engine/client/{ => job}/JobClient.java | 3 +-
.../engine/client/{ => job}/JobConfigParser.java | 6 +-
.../client/{ => job}/JobExecutionEnvironment.java | 26 +--
.../engine/client/{ => job}/JobProxy.java | 39 +++-
.../engine/client/JobConfigParserTest.java | 1 +
.../engine/client/LogicalDagGeneratorTest.java | 4 +-
.../engine/client/SeaTunnelClientTest.java | 24 ++-
...leFuture.java => PassiveCompletableFuture.java} | 19 +-
.../org/apache/seatunnel/engine/core/job/Job.java | 2 +
.../protocol/codec/SeaTunnelPrintMessageCodec.java | 3 -
.../protocol/codec/SeaTunnelSubmitJobCodec.java | 32 +++-
....java => SeaTunnelWaitForJobCompleteCodec.java} | 54 +++---
.../SeaTunnelEngine.yaml | 26 +++
.../seatunnel/engine/server/SeaTunnelServer.java | 36 +++-
.../engine/server/TaskExecutionService.java | 9 +-
.../engine/server/dag/physical/PhysicalPlan.java | 18 +-
.../server/dag/physical/PhysicalPlanGenerator.java | 205 +++++++++++----------
.../engine/server/dag/physical/PhysicalVertex.java | 66 ++++---
.../engine/server/dag/physical/SubPlan.java | 10 +-
.../seatunnel/engine/server/master/JobMaster.java | 35 ++--
...eration.java => AbstractJobAsyncOperation.java} | 31 +---
.../engine/server/operation/AsyncOperation.java | 6 +-
.../operation/CheckpointTriggerOperation.java | 4 +-
.../server/operation/DeployTaskOperation.java | 10 +-
.../server/operation/SubmitJobOperation.java | 12 +-
.../operation/WaitForJobCompleteOperation.java} | 29 +--
.../task/SeaTunnelMessageTaskFactoryProvider.java | 9 +-
.../engine/server/protocol/task/SubmitJobTask.java | 5 +-
...mitJobTask.java => WaitForJobCompleteTask.java} | 21 +--
.../serializable/OperationDataSerializerHook.java | 6 +-
.../seatunnel/engine/server/dag/TaskTest.java | 7 +-
39 files changed, 501 insertions(+), 329 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 0f681028c..0bc67c0a0 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -24,6 +24,8 @@ on:
- '**/*.md'
- 'seatunnel-ui/**'
- 'seatunnel-engine/**'
+ - 'seatunnel-engine/seatunnel-engine-e2e/**'
+ - 'seatunnel-examples/seatunnel-engine-examples/**'
concurrency:
group: backend-${{ github.event.pull_request.number || github.ref }}
diff --git a/.github/workflows/engine_backend.yml b/.github/workflows/engine_backend.yml
index 7bc593944..5dcca74ad 100644
--- a/.github/workflows/engine_backend.yml
+++ b/.github/workflows/engine_backend.yml
@@ -21,6 +21,8 @@ on:
pull_request:
paths:
- 'seatunnel-engine/**'
+ - 'seatunnel-engine/seatunnel-engine-e2e/**'
+ - 'seatunnel-examples/seatunnel-engine-examples/**'
concurrency:
group: backend-${{ github.event.pull_request.number || github.ref }}
diff --git a/.licenserc.yaml b/.licenserc.yaml
index a105be8bc..fd6bd426a 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -46,5 +46,6 @@ header:
- 'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java'
- 'seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java'
- 'seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java'
+ - 'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java'
comment: on-failure
diff --git a/LICENSE b/LICENSE
index eb58900c3..f7731139d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -220,10 +220,12 @@ generate_client_protocol.sh
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AbstractJobAsyncOperation.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java from https://github.com/apache/flink
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java from https://github.com/hazelcast/hazelcast
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
index 7d2238421..90b12b48e 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
+++ b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
import org.apache.seatunnel.core.starter.utils.FileUtils;
-import org.apache.seatunnel.engine.client.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.client.JobProxy;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.JobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -45,12 +45,12 @@ public class SeaTunnelStarter {
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
+ JobProxy jobProxy;
try {
- JobProxy jobProxy = jobExecutionEnv.execute();
+ jobProxy = jobExecutionEnv.execute();
+ jobProxy.waitForJobComplete();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
-
- // TODO wait for job complete and then exit
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index e182c527e..d89b24570 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.client;
+import org.apache.seatunnel.engine.client.job.JobClient;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 62590cbef..ccb828119 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.client;
+import org.apache.seatunnel.engine.client.job.JobClient;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.JobConfig;
public interface SeaTunnelClientInstance {
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index c22faeb1c..3700378aa 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -18,9 +18,11 @@
package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.client.impl.ClientDelegatingFuture;
+import com.hazelcast.client.impl.clientside.ClientMessageDecoder;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.impl.protocol.ClientMessage;
@@ -32,7 +34,6 @@ import com.hazelcast.logging.ILogger;
import lombok.NonNull;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public class SeaTunnelHazelcastClient {
@@ -76,8 +77,9 @@ public class SeaTunnelHazelcastClient {
return requestAndDecodeResponse(null, request, decoder);
}
- public <S> S requestAndDecodeResponse(UUID uuid, ClientMessage request,
- Function<ClientMessage, Object> decoder) {
+ public <S> S requestAndDecodeResponse(@NonNull UUID uuid,
+ @NonNull ClientMessage request,
+ @NonNull Function<ClientMessage, Object> decoder) {
ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
try {
ClientMessage response = invocation.invoke().get();
@@ -90,21 +92,52 @@ public class SeaTunnelHazelcastClient {
}
}
- public NonCompletableFuture<Void> requestAndGetCompletableFuture(UUID uuid, ClientMessage request) {
+ public <T> PassiveCompletableFuture<T> requestAndGetCompletableFuture(@NonNull UUID uuid,
+ @NonNull ClientMessage request,
+ @NonNull
+ ClientMessageDecoder clientMessageDecoder) {
ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
try {
- return new NonCompletableFuture<>(invocation.invoke().thenApply(c -> null));
+
+ return new PassiveCompletableFuture<T>(new ClientDelegatingFuture<>(
+ invocation.invoke(),
+ serializationService,
+ clientMessageDecoder
+ ));
} catch (Throwable t) {
throw ExceptionUtil.rethrow(t);
}
}
- public NonCompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
+ public <T> PassiveCompletableFuture<T> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request,
+ @NonNull
+ ClientMessageDecoder clientMessageDecoder) {
UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
- return requestAndGetCompletableFuture(masterUuid, request);
+ return requestAndGetCompletableFuture(masterUuid, request, clientMessageDecoder);
+ }
+
+ public <T> PassiveCompletableFuture<T> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request,
+ @NonNull
+ ClientMessageDecoder clientMessageDecoder) {
+ return requestAndGetCompletableFuture(null, request, clientMessageDecoder);
}
- public CompletableFuture<Void> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request) {
+ public PassiveCompletableFuture<Void> requestAndGetCompletableFuture(@NonNull UUID uuid,
+ @NonNull ClientMessage request) {
+ ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
+ try {
+ return new PassiveCompletableFuture(invocation.invoke().thenApply(r -> null));
+ } catch (Throwable t) {
+ throw ExceptionUtil.rethrow(t);
+ }
+ }
+
+ public PassiveCompletableFuture<Void> requestOnAnyMemberAndGetCompletableFuture(@NonNull ClientMessage request) {
return requestAndGetCompletableFuture(null, request);
}
+
+ public PassiveCompletableFuture<Void> requestOnMasterAndGetCompletableFuture(@NonNull ClientMessage request) {
+ UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
+ return requestAndGetCompletableFuture(masterUuid, request);
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
similarity index 92%
copy from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
copy to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 5eb391b31..964a7891e 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.client.job;
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
similarity index 98%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index 8f37bf96f..4d1850bcf 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.client.job;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.engine.client.ConnectorInstanceLoader;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -75,7 +76,8 @@ public class JobConfigParser {
private JobConfig jobConfig;
- protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull IdGenerator idGenerator,
+ public JobConfigParser(@NonNull String jobDefineFilePath,
+ @NonNull IdGenerator idGenerator,
@NonNull JobConfig jobConfig) {
this.jobDefineFilePath = jobDefineFilePath;
this.idGenerator = idGenerator;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
similarity index 79%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 45bf325fb..5c83874e7 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -1,13 +1,12 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.client.job;
import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -80,10 +80,10 @@ public class JobExecutionEnvironment {
JobClient jobClient = new JobClient(seaTunnelHazelcastClient);
initSeaTunnelContext();
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
- jobClient.getNewJobId(),
- seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
- jobConfig,
- jarUrls);
+ jobClient.getNewJobId(),
+ seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
+ jobConfig,
+ jarUrls);
JobProxy jobProxy = jobClient.createJobProxy(jobImmutableInformation);
jobProxy.submitJob();
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
similarity index 57%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
index 84b80770d..dcd0235e2 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
@@ -15,12 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.client.job;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.Job;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.logging.ILogger;
@@ -47,10 +51,37 @@ public class JobProxy implements Job {
@Override
public void submitJob() throws ExecutionException, InterruptedException {
- ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(
+ ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(jobImmutableInformation.getJobId(),
seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation));
- NonCompletableFuture<Void> submitJobFuture =
+ PassiveCompletableFuture<Void> submitJobFuture =
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
submitJobFuture.get();
}
+
+ @Override
+ public void waitForJobComplete() {
+ PassiveCompletableFuture<JobStatus> jobFuture =
+ seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
+ SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
+ response -> {
+ return JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)];
+ });
+
+ jobFuture.whenComplete((v, t) -> {
+ if (null != t) {
+ LOGGER.info(String.format("Job %s (%s) end with state %s, and throw Exception: %s",
+ jobImmutableInformation.getJobId(),
+ jobImmutableInformation.getJobConfig().getName(),
+ v,
+ ExceptionUtils.getMessage(t)));
+ } else {
+ LOGGER.info(String.format("Job %s (%s) end with state %s",
+ jobImmutableInformation.getJobId(),
+ jobImmutableInformation.getJobConfig().getName(),
+ v));
+ }
+ });
+
+ jobFuture.join();
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index 4534c0a7b..919b492d5 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.client.job.JobConfigParser;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index a73579fd4..e44328148 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.engine.client.job.JobConfigParser;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -48,7 +49,8 @@ public class LogicalDagGeneratorTest {
jobConfig.setName("fake_to_file");
IdGenerator idGenerator = new IdGenerator();
- ImmutablePair<List<Action>, Set<URL>> immutablePair = new JobConfigParser(filePath, idGenerator, new JobConfig()).parse();
+ ImmutablePair<List<Action>, Set<URL>> immutablePair =
+ new JobConfigParser(filePath, idGenerator, new JobConfig()).parse();
LogicalDagGenerator logicalDagGenerator =
new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, idGenerator);
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index f641a5bac..6a1c992a5 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.JobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
@@ -37,6 +39,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.util.concurrent.ExecutionException;
+
@SuppressWarnings("checkstyle:MagicNumber")
@RunWith(JUnit4.class)
public class SeaTunnelClientTest {
@@ -47,8 +51,8 @@ public class SeaTunnelClientTest {
public void beforeClass() throws Exception {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
instance = HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
- Thread.currentThread().getName(),
- new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
+ Thread.currentThread().getName(),
+ new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
}
@Test
@@ -74,13 +78,15 @@ public class SeaTunnelClientTest {
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
- // JobProxy jobProxy;
- // try {
- // jobProxy = jobExecutionEnv.execute();
- // Assert.assertNotNull(jobProxy);
- // } catch (ExecutionException | InterruptedException e) {
- // throw new RuntimeException(e);
- // }
+ JobProxy jobProxy = null;
+ try {
+ jobProxy = jobExecutionEnv.execute();
+ jobProxy.waitForJobComplete();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ // TODO throw exception after fix sink.setTypeInfo in ConnectorInstanceLoader
+ // throw new RuntimeException(e);
+ }
}
@After
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
similarity index 73%
rename from seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
rename to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
index a3c6d2380..0a6c519d7 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/NonCompletableFuture.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
@@ -1,12 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,12 +21,12 @@ import java.util.concurrent.CompletableFuture;
/**
* A future which prevents completion by outside caller
*/
-public class NonCompletableFuture<T> extends CompletableFuture<T> {
+public class PassiveCompletableFuture<T> extends CompletableFuture<T> {
- public NonCompletableFuture() {
+ public PassiveCompletableFuture() {
}
- public NonCompletableFuture(CompletableFuture<T> chainedFuture) {
+ public PassiveCompletableFuture(CompletableFuture<T> chainedFuture) {
chainedFuture.whenComplete((r, t) -> {
if (t != null) {
internalCompleteExceptionally(t);
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 90b74350d..5ad043ed0 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -23,4 +23,6 @@ public interface Job {
long getJobId();
void submitJob() throws ExecutionException, InterruptedException;
+
+ void waitForJobComplete();
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
index ec10e3d8a..3d3f0b225 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
@@ -61,9 +61,6 @@ public final class SeaTunnelPrintMessageCodec {
return clientMessage;
}
- /**
- *
- */
public static java.lang.String decodeRequest(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
//empty initial frame
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
index 36ec48de4..3762d21ca 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelSubmitJobCodec.java
@@ -23,7 +23,10 @@ import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET
import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.Generated;
@@ -36,19 +39,28 @@ import com.hazelcast.client.impl.protocol.codec.builtin.DataCodec;
* and regenerate it.
*/
-@Generated("f0fdf747fe01901765dedbe5a527bb0f")
+@Generated("ebea440b36898863958c102f47603fee")
public final class SeaTunnelSubmitJobCodec {
//hex: 0xDE0200
public static final int REQUEST_MESSAGE_TYPE = 14549504;
//hex: 0xDE0201
public static final int RESPONSE_MESSAGE_TYPE = 14549505;
- private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
private SeaTunnelSubmitJobCodec() {
}
- public static ClientMessage encodeRequest(com.hazelcast.internal.serialization.Data jobImmutableInformation) {
+ public static class RequestParameters {
+
+ public long jobId;
+
+ public com.hazelcast.internal.serialization.Data jobImmutableInformation;
+ }
+
+ public static ClientMessage encodeRequest(long jobId,
+ com.hazelcast.internal.serialization.Data jobImmutableInformation) {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(false);
clientMessage.setOperationName("SeaTunnel.SubmitJob");
@@ -56,19 +68,19 @@ public final class SeaTunnelSubmitJobCodec {
new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+ encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
clientMessage.add(initialFrame);
DataCodec.encode(clientMessage, jobImmutableInformation);
return clientMessage;
}
- /**
- *
- */
- public static com.hazelcast.internal.serialization.Data decodeRequest(ClientMessage clientMessage) {
+ public static SeaTunnelSubmitJobCodec.RequestParameters decodeRequest(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
- //empty initial frame
- iterator.next();
- return DataCodec.decode(iterator);
+ RequestParameters request = new RequestParameters();
+ ClientMessage.Frame initialFrame = iterator.next();
+ request.jobId = decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+ request.jobImmutableInformation = DataCodec.decode(iterator);
+ return request;
}
public static ClientMessage encodeResponse() {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
similarity index 60%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
copy to seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
index ec10e3d8a..eee977992 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelWaitForJobCompleteCodec.java
@@ -23,11 +23,14 @@ import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET
import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.Generated;
-import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
/*
* This file is auto-generated by the Hazelcast Client Protocol Code Generator.
@@ -36,59 +39,56 @@ import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
* and regenerate it.
*/
-@Generated("c0a6d0c9d7eb912e8b10861931a0a695")
-public final class SeaTunnelPrintMessageCodec {
- //hex: 0xDE0100
- public static final int REQUEST_MESSAGE_TYPE = 14549248;
- //hex: 0xDE0101
- public static final int RESPONSE_MESSAGE_TYPE = 14549249;
- private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
- private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+@Generated("45a79cdd8ea874bd3c99b414d5f7639f")
+public final class SeaTunnelWaitForJobCompleteCodec {
+ //hex: 0xDE0300
+ public static final int REQUEST_MESSAGE_TYPE = 14549760;
+ //hex: 0xDE0301
+ public static final int RESPONSE_MESSAGE_TYPE = 14549761;
+ private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+ private static final int RESPONSE_JOB_STATUS_FIELD_OFFSET = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+ private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_JOB_STATUS_FIELD_OFFSET + INT_SIZE_IN_BYTES;
- private SeaTunnelPrintMessageCodec() {
+ private SeaTunnelWaitForJobCompleteCodec() {
}
- public static ClientMessage encodeRequest(java.lang.String message) {
+ public static ClientMessage encodeRequest(long jobId) {
ClientMessage clientMessage = ClientMessage.createForEncode();
- clientMessage.setRetryable(false);
- clientMessage.setOperationName("SeaTunnel.PrintMessage");
+ clientMessage.setRetryable(true);
+ clientMessage.setOperationName("SeaTunnel.WaitForJobComplete");
ClientMessage.Frame initialFrame =
new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+ encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
clientMessage.add(initialFrame);
- StringCodec.encode(clientMessage, message);
return clientMessage;
}
- /**
- *
- */
- public static java.lang.String decodeRequest(ClientMessage clientMessage) {
+ public static long decodeRequest(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
- //empty initial frame
- iterator.next();
- return StringCodec.decode(iterator);
+ ClientMessage.Frame initialFrame = iterator.next();
+ return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
}
- public static ClientMessage encodeResponse(java.lang.String response) {
+ public static ClientMessage encodeResponse(int jobStatus) {
ClientMessage clientMessage = ClientMessage.createForEncode();
ClientMessage.Frame initialFrame =
new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
+ encodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET, jobStatus);
clientMessage.add(initialFrame);
- StringCodec.encode(clientMessage, response);
return clientMessage;
}
/**
*
*/
- public static java.lang.String decodeResponse(ClientMessage clientMessage) {
+ public static int decodeResponse(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
- //empty initial frame
- iterator.next();
- return StringCodec.decode(iterator);
+ ClientMessage.Frame initialFrame = iterator.next();
+ return decodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index fb4b29e2b..19f616d29 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -48,9 +48,35 @@ methods:
retryable: false
partitionIdentifier: -1
params:
+ - name: jobId
+ type: long
+ nullable: false
+ since: 2.0
+ doc: ''
- name: jobImmutableInformation
type: Data
nullable: false
since: 2.0
doc: ''
response: {}
+
+ - id: 3
+ name: waitForJobComplete
+ since: 2.0
+ doc: ''
+ request:
+ retryable: true
+ partitionIdentifier: -1
+ params:
+ - name: jobId
+ type: long
+ nullable: false
+ since: 2.0
+ doc: ''
+ response:
+ params:
+ - name: jobStatus
+ type: int
+ nullable: false
+ since: 2.0
+ doc: ''
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 09a1454e9..3b0cf8dd6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,8 +17,10 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.master.JobMaster;
import com.hazelcast.instance.impl.Node;
@@ -35,8 +37,10 @@ import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import lombok.NonNull;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -54,6 +58,8 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
private final SeaTunnelConfig seaTunnelConfig;
+ private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
+
public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
this.logger = node.getLogger(getClass());
this.liveOperationRegistry = new LiveOperationRegistry();
@@ -116,23 +122,39 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
/**
* call by client to submit job
*/
- @SuppressWarnings("checkstyle:MagicNumber")
- public NonCompletableFuture<Void> submitJob(Data jobImmutableInformation) {
+ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
JobMaster jobMaster = new JobMaster(jobImmutableInformation, this.nodeEngine, executorService);
executorService.submit(() -> {
try {
jobMaster.init();
- jobMaster.run();
+ runningJobMasterMap.put(jobId, jobMaster);
} catch (Throwable e) {
- LOGGER.severe("submit job error: " + e.getMessage());
+ LOGGER.severe(String.format("submit job %s error %s ", jobId, ExceptionUtils.getMessage(e)));
voidCompletableFuture.completeExceptionally(e);
} finally {
// We specify that when init is complete, the submitJob is complete
voidCompletableFuture.complete(null);
}
- //jobMaster.run();
+
+ try {
+ jobMaster.run();
+ } finally {
+ runningJobMasterMap.remove(jobId);
+ }
});
- return new NonCompletableFuture<>(voidCompletableFuture);
+ return new PassiveCompletableFuture(voidCompletableFuture);
+ }
+
+ public PassiveCompletableFuture<JobStatus> waitForJobComplete(long jobId) {
+ JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
+ if (runningJobMaster == null) {
+ // TODO Get Job Status from JobHistoryStorage
+ CompletableFuture<JobStatus> future = new CompletableFuture<>();
+ future.complete(JobStatus.FINISHED);
+ return new PassiveCompletableFuture<>(future);
+ } else {
+ return runningJobMaster.getJobMasterCompleteFuture();
+ }
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 2f54e7f41..1b1bcbb21 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -25,7 +25,7 @@ import static java.util.stream.Collectors.partitioningBy;
import static java.util.stream.Collectors.toList;
import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.Task;
@@ -117,7 +117,7 @@ public class TaskExecutionService {
uncheckRun(startedLatch::await);
}
- public NonCompletableFuture<TaskExecutionState> deployTask(
+ public PassiveCompletableFuture<TaskExecutionState> deployTask(
@NonNull Data taskImmutableInformation
) {
CompletableFuture<TaskExecutionState> resultFuture = new CompletableFuture<>();
@@ -134,7 +134,8 @@ public class TaskExecutionService {
// TODO We need add a method to cancel task
CompletableFuture<Void> cancellationFuture = new CompletableFuture<>();
- TaskGroupExecutionTracker executionTracker = new TaskGroupExecutionTracker(cancellationFuture, taskGroup, resultFuture);
+ TaskGroupExecutionTracker executionTracker =
+ new TaskGroupExecutionTracker(cancellationFuture, taskGroup, resultFuture);
ConcurrentMap<Long, TaskExecutionContext> taskExecutionContextMap = new ConcurrentHashMap<>();
final Map<Boolean, List<Task>> byCooperation =
tasks.stream()
@@ -152,7 +153,7 @@ public class TaskExecutionService {
logger.severe(ExceptionUtils.getMessage(t));
resultFuture.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
}
- return new NonCompletableFuture<>(resultFuture);
+ return new PassiveCompletableFuture<>(resultFuture);
}
private final class BlockingWorker implements Runnable {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 42464bc5d..00df7ebef 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineState;
@@ -62,13 +62,13 @@ public class PhysicalPlan {
* in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler} whenComplete method will be called.
*/
private final CompletableFuture<JobStatus> jobEndFuture;
- private final NonCompletableFuture<JobStatus> nonCompletableFuture;
+ private final PassiveCompletableFuture<JobStatus> passiveCompletableFuture;
/**
* This future only can completion by the {@link SubPlan } subPlanFuture.
* When subPlanFuture completed, this NonCompletableFuture's whenComplete method will be called.
*/
- private final NonCompletableFuture<PipelineState>[] waitForCompleteBySubPlan;
+ private final PassiveCompletableFuture<PipelineState>[] waitForCompleteBySubPlan;
private final ExecutorService executorService;
@@ -76,7 +76,7 @@ public class PhysicalPlan {
@NonNull ExecutorService executorService,
@NonNull JobImmutableInformation jobImmutableInformation,
long initializationTimestamp,
- @NonNull NonCompletableFuture<PipelineState>[] waitForCompleteBySubPlan) {
+ @NonNull PassiveCompletableFuture<PipelineState>[] waitForCompleteBySubPlan) {
this.executorService = executorService;
this.jobImmutableInformation = jobImmutableInformation;
stateTimestamps = new long[JobStatus.values().length];
@@ -84,7 +84,7 @@ public class PhysicalPlan {
this.jobStatus.set(JobStatus.CREATED);
this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
this.jobEndFuture = new CompletableFuture<>();
- this.nonCompletableFuture = new NonCompletableFuture<>(jobEndFuture);
+ this.passiveCompletableFuture = new PassiveCompletableFuture<>(jobEndFuture);
this.waitForCompleteBySubPlan = waitForCompleteBySubPlan;
this.pipelineList = pipelineList;
if (pipelineList.isEmpty()) {
@@ -124,14 +124,14 @@ public class PhysicalPlan {
});
}
- public NonCompletableFuture<Void> cancelJob() {
+ public PassiveCompletableFuture<Void> cancelJob() {
CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
// TODO Implement cancel pipeline in job.
return null;
});
cancelFuture.complete(null);
- return new NonCompletableFuture<>(cancelFuture);
+ return new PassiveCompletableFuture<>(cancelFuture);
}
public List<SubPlan> getPipelineList() {
@@ -173,8 +173,8 @@ public class PhysicalPlan {
}
}
- public NonCompletableFuture<JobStatus> getJobEndCompletableFuture() {
- return this.nonCompletableFuture;
+ public PassiveCompletableFuture<JobStatus> getJobEndCompletableFuture() {
+ return this.passiveCompletableFuture;
}
public JobImmutableInformation getJobImmutableInformation() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index d2b2e1950..a2b71303d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -114,7 +114,7 @@ public class PhysicalPlanGenerator {
public PhysicalPlan generate() {
// TODO Determine which tasks do not need to be restored according to state
- CopyOnWriteArrayList<NonCompletableFuture<PipelineState>> waitForCompleteBySubPlanList =
+ CopyOnWriteArrayList<PassiveCompletableFuture<PipelineState>> waitForCompleteBySubPlanList =
new CopyOnWriteArrayList<>();
final int totalPipelineNum = pipelines.size();
@@ -122,14 +122,14 @@ public class PhysicalPlanGenerator {
final int pipelineId = pipeline.getId();
final List<ExecutionEdge> edges = pipeline.getEdges();
- CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList =
+ CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList =
new CopyOnWriteArrayList<>();
List<SourceAction<?, ?, ?>> sources = findSourceAction(edges);
List<PhysicalVertex> coordinatorVertexList =
getEnumeratorTask(sources, pipelineId, totalPipelineNum, waitForCompleteByPhysicalVertexList);
coordinatorVertexList.addAll(
- getCommitterTask(edges, pipelineId, totalPipelineNum, waitForCompleteByPhysicalVertexList));
+ getCommitterTask(edges, pipelineId, totalPipelineNum, waitForCompleteByPhysicalVertexList));
List<PhysicalVertex> physicalVertexList =
getSourceTask(edges, sources, pipelineId, totalPipelineNum, waitForCompleteByPhysicalVertexList);
@@ -138,24 +138,24 @@ public class PhysicalPlanGenerator {
getPartitionTask(edges, pipelineId, totalPipelineNum, waitForCompleteByPhysicalVertexList));
CompletableFuture<PipelineState> pipelineFuture = new CompletableFuture<>();
- waitForCompleteBySubPlanList.add(new NonCompletableFuture<>(pipelineFuture));
+ waitForCompleteBySubPlanList.add(new PassiveCompletableFuture<>(pipelineFuture));
return new SubPlan(pipelineId,
- totalPipelineNum,
- initializationTimestamp,
- physicalVertexList,
- coordinatorVertexList,
- pipelineFuture,
- waitForCompleteByPhysicalVertexList.toArray(
- new NonCompletableFuture[waitForCompleteByPhysicalVertexList.size()]),
- jobImmutableInformation);
+ totalPipelineNum,
+ initializationTimestamp,
+ physicalVertexList,
+ coordinatorVertexList,
+ pipelineFuture,
+ waitForCompleteByPhysicalVertexList.toArray(
+ new PassiveCompletableFuture[waitForCompleteByPhysicalVertexList.size()]),
+ jobImmutableInformation);
});
return new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
- executorService,
- jobImmutableInformation,
- initializationTimestamp,
- waitForCompleteBySubPlanList.toArray(new NonCompletableFuture[waitForCompleteBySubPlanList.size()]));
+ executorService,
+ jobImmutableInformation,
+ initializationTimestamp,
+ waitForCompleteBySubPlanList.toArray(new PassiveCompletableFuture[waitForCompleteBySubPlanList.size()]));
}
private List<SourceAction<?, ?, ?>> findSourceAction(List<ExecutionEdge> edges) {
@@ -167,7 +167,7 @@ public class PhysicalPlanGenerator {
private List<PhysicalVertex> getCommitterTask(List<ExecutionEdge> edges,
int pipelineIndex,
int totalPipelineNum,
- CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+ CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
AtomicInteger atomicInteger = new AtomicInteger(-1);
List<ExecutionEdge> collect = edges.stream().filter(s -> s.getRightVertex().getAction() instanceof SinkAction)
.collect(Collectors.toList());
@@ -184,27 +184,27 @@ public class PhysicalPlanGenerator {
if (sinkAggregatedCommitter.isPresent()) {
long taskGroupID = idGenerator.getNextId();
SinkAggregatedCommitterTask<?> t =
- new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 1)), s,
- sinkAggregatedCommitter.get());
+ new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
+ new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 1)), s,
+ sinkAggregatedCommitter.get());
committerTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
- waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+ waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
return new PhysicalVertex(idGenerator.getNextId(),
- atomicInteger.incrementAndGet(),
- executorService,
- collect.size(),
- new TaskGroupDefaultImpl(taskGroupID, "SinkAggregatedCommitterTask",
- Lists.newArrayList(t)),
- taskFuture,
- flakeIdGenerator,
- pipelineIndex,
- totalPipelineNum,
- null,
- jobImmutableInformation,
- initializationTimestamp,
- nodeEngine);
+ atomicInteger.incrementAndGet(),
+ executorService,
+ collect.size(),
+ new TaskGroupDefaultImpl(taskGroupID, "SinkAggregatedCommitterTask",
+ Lists.newArrayList(t)),
+ taskFuture,
+ flakeIdGenerator,
+ pipelineIndex,
+ totalPipelineNum,
+ null,
+ jobImmutableInformation,
+ initializationTimestamp,
+ nodeEngine);
} else {
return null;
}
@@ -214,7 +214,7 @@ public class PhysicalPlanGenerator {
private List<PhysicalVertex> getPartitionTask(List<ExecutionEdge> edges,
int pipelineIndex,
int totalPipelineNum,
- CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+ CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof PartitionTransformAction)
.map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
.map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
@@ -225,17 +225,17 @@ public class PhysicalPlanGenerator {
long taskGroupID = idGenerator.getNextId();
setFlowConfig(flow, i);
SeaTunnelTask seaTunnelTask = new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, convertToTaskID(taskIDPrefix, i)), i, flow);
+ new TaskLocation(taskGroupID, convertToTaskID(taskIDPrefix, i)), i, flow);
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
- waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+ waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
t.add(new PhysicalVertex(idGenerator.getNextId(),
i,
executorService,
flow.getAction().getParallelism(),
new TaskGroupDefaultImpl(taskGroupID, "PartitionTransformTask",
- Lists.newArrayList(seaTunnelTask)),
+ Lists.newArrayList(seaTunnelTask)),
taskFuture,
flakeIdGenerator,
pipelineIndex,
@@ -252,16 +252,16 @@ public class PhysicalPlanGenerator {
private List<PhysicalVertex> getEnumeratorTask(List<SourceAction<?, ?, ?>> sources,
int pipelineIndex,
int totalPipelineNum,
- CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+ CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
AtomicInteger atomicInteger = new AtomicInteger(-1);
return sources.stream().map(s -> {
long taskGroupID = idGenerator.getNextId();
SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 1)), s);
+ new TaskLocation(taskGroupID, convertToTaskID(idGenerator.getNextId(), 1)), s);
enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID, t.getTaskID()));
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
- waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+ waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
return new PhysicalVertex(idGenerator.getNextId(),
atomicInteger.incrementAndGet(),
@@ -283,7 +283,7 @@ public class PhysicalPlanGenerator {
List<SourceAction<?, ?, ?>> sources,
int pipelineIndex,
int totalPipelineNum,
- CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
+ CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> waitForCompleteByPhysicalVertexList) {
return sources.stream()
.map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
.flatMap(flow -> {
@@ -297,59 +297,60 @@ public class PhysicalPlanGenerator {
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
int finalParallelismIndex = i;
List<SeaTunnelTask> taskList =
- flows.stream().map(f -> {
- setFlowConfig(f, finalParallelismIndex);
- long taskIDPrefix = flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id -> idGenerator.getNextId());
- if (f instanceof PhysicalExecutionFlow) {
- return new SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID,
- convertToTaskID(taskIDPrefix, finalParallelismIndex)),
- finalParallelismIndex, f);
- } else {
- return new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID,
- convertToTaskID(taskIDPrefix, finalParallelismIndex)),
- finalParallelismIndex, f);
- }
- }).collect(Collectors.toList());
+ flows.stream().map(f -> {
+ setFlowConfig(f, finalParallelismIndex);
+ long taskIDPrefix =
+ flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id -> idGenerator.getNextId());
+ if (f instanceof PhysicalExecutionFlow) {
+ return new SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
+ new TaskLocation(taskGroupID,
+ convertToTaskID(taskIDPrefix, finalParallelismIndex)),
+ finalParallelismIndex, f);
+ } else {
+ return new TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
+ new TaskLocation(taskGroupID,
+ convertToTaskID(taskIDPrefix, finalParallelismIndex)),
+ finalParallelismIndex, f);
+ }
+ }).collect(Collectors.toList());
Set<URL> jars =
- taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
+ taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
CompletableFuture<TaskExecutionState> taskFuture = new CompletableFuture<>();
- waitForCompleteByPhysicalVertexList.add(new NonCompletableFuture<>(taskFuture));
+ waitForCompleteByPhysicalVertexList.add(new PassiveCompletableFuture<>(taskFuture));
// TODO We need give every task a appropriate name
if (taskList.stream().anyMatch(TransformSeaTunnelTask.class::isInstance)) {
// contains IntermediateExecutionFlow in task group
t.add(new PhysicalVertex(idGenerator.getNextId(),
- i,
- executorService,
- flow.getAction().getParallelism(),
- new TaskGroupWithIntermediateQueue(taskGroupID, "SourceTask",
- taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
- taskFuture,
- flakeIdGenerator,
- pipelineIndex,
- totalPipelineNum,
- jars,
- jobImmutableInformation,
- initializationTimestamp,
- nodeEngine));
+ i,
+ executorService,
+ flow.getAction().getParallelism(),
+ new TaskGroupWithIntermediateQueue(taskGroupID, "SourceTask",
+ taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
+ taskFuture,
+ flakeIdGenerator,
+ pipelineIndex,
+ totalPipelineNum,
+ jars,
+ jobImmutableInformation,
+ initializationTimestamp,
+ nodeEngine));
} else {
t.add(new PhysicalVertex(idGenerator.getNextId(),
- i,
- executorService,
- flow.getAction().getParallelism(),
- new TaskGroupDefaultImpl(taskGroupID, "SourceTask",
- taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
- taskFuture,
- flakeIdGenerator,
- pipelineIndex,
- totalPipelineNum,
- jars,
- jobImmutableInformation,
- initializationTimestamp,
- nodeEngine));
+ i,
+ executorService,
+ flow.getAction().getParallelism(),
+ new TaskGroupDefaultImpl(taskGroupID, "SourceTask",
+ taskList.stream().map(task -> (Task) task).collect(Collectors.toList())),
+ taskFuture,
+ flakeIdGenerator,
+ pipelineIndex,
+ totalPipelineNum,
+ jars,
+ jobImmutableInformation,
+ initializationTimestamp,
+ nodeEngine));
}
}
return t.stream();
@@ -380,14 +381,15 @@ public class PhysicalPlanGenerator {
flow.setConfig(config);
} else if (flow.getAction() instanceof PartitionTransformAction) {
PartitionConfig config =
- new PartitionConfig(((PartitionTransformAction) flow.getAction()).getPartitionTransformation().getPartitionCount(),
- ((PartitionTransformAction) flow.getAction()).getPartitionTransformation().getTargetCount(),
- parallelismIndex);
+ new PartitionConfig(
+ ((PartitionTransformAction) flow.getAction()).getPartitionTransformation().getPartitionCount(),
+ ((PartitionTransformAction) flow.getAction()).getPartitionTransformation().getTargetCount(),
+ parallelismIndex);
flow.setConfig(config);
}
} else if (f instanceof IntermediateExecutionFlow) {
((IntermediateExecutionFlow<IntermediateQueueConfig>) f)
- .setConfig(new IntermediateQueueConfig(((IntermediateExecutionFlow<?>) f).getQueue().getId()));
+ .setConfig(new IntermediateQueueConfig(((IntermediateExecutionFlow<?>) f).getQueue().getId()));
} else {
throw new UnknownFlowException(f);
}
@@ -406,13 +408,14 @@ public class PhysicalPlanGenerator {
*/
private static List<Flow> splitSinkFromFlow(Flow flow) {
List<PhysicalExecutionFlow<?, ?>> sinkFlows =
- flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow).map(f -> (PhysicalExecutionFlow<?, ?>) f)
- .filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
+ flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow)
+ .map(f -> (PhysicalExecutionFlow<?, ?>) f)
+ .filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
List<Flow> allFlows = new ArrayList<>();
flow.getNext().removeAll(sinkFlows);
sinkFlows.forEach(s -> {
IntermediateQueue queue = new IntermediateQueue(s.getAction().getId(),
- s.getAction().getName() + "-Queue", s.getAction().getParallelism());
+ s.getAction().getName() + "-Queue", s.getAction().getParallelism());
IntermediateExecutionFlow<?> intermediateFlow = new IntermediateExecutionFlow<>(queue);
flow.getNext().add(intermediateFlow);
IntermediateExecutionFlow<?> intermediateFlowQuote = new IntermediateExecutionFlow<>(queue);
@@ -422,15 +425,15 @@ public class PhysicalPlanGenerator {
if (flow.getNext().size() > sinkFlows.size()) {
allFlows.addAll(flow.getNext().stream().flatMap(f -> splitSinkFromFlow(f).stream())
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()));
}
return allFlows;
}
private static boolean sourceWithSink(PhysicalExecutionFlow<?, ?> flow) {
return flow.getAction() instanceof SinkAction ||
- flow.getNext().stream().map(f -> (PhysicalExecutionFlow<?, ?>) f).map(PhysicalPlanGenerator::sourceWithSink)
- .collect(Collectors.toList()).contains(true);
+ flow.getNext().stream().map(f -> (PhysicalExecutionFlow<?, ?>) f).map(PhysicalPlanGenerator::sourceWithSink)
+ .collect(Collectors.toList()).contains(true);
}
@SuppressWarnings("checkstyle:MagicNumber")
@@ -440,13 +443,13 @@ public class PhysicalPlanGenerator {
private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) {
List<Action> actions = edges.stream().filter(e -> e.getLeftVertex().getAction().equals(start))
- .map(e -> e.getRightVertex().getAction()).collect(Collectors.toList());
+ .map(e -> e.getRightVertex().getAction()).collect(Collectors.toList());
List<Flow> wrappers = actions.stream()
- .filter(a -> a instanceof PartitionTransformAction || a instanceof SinkAction)
- .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
+ .filter(a -> a instanceof PartitionTransformAction || a instanceof SinkAction)
+ .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
wrappers.addAll(actions.stream()
- .filter(a -> !(a instanceof PartitionTransformAction || a instanceof SinkAction))
- .map(a -> new PhysicalExecutionFlow<>(a, getNextWrapper(edges, a))).collect(Collectors.toList()));
+ .filter(a -> !(a instanceof PartitionTransformAction || a instanceof SinkAction))
+ .map(a -> new PhysicalExecutionFlow<>(a, getNextWrapper(edges, a))).collect(Collectors.toList()));
return wrappers;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index ed823a74a..f11019fa0 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -18,15 +18,12 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
-import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
-import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import com.hazelcast.cluster.Address;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
@@ -92,7 +89,7 @@ public class PhysicalVertex {
/**
* This future only can completion by the task run in {@link com.hazelcast.spi.impl.executionservice.ExecutionService }
*/
- private NonCompletableFuture<TaskExecutionState> waitForCompleteByExecutionService;
+ private PassiveCompletableFuture<TaskExecutionState> waitForCompleteByExecutionService;
private final JobImmutableInformation jobImmutableInformation;
@@ -145,26 +142,35 @@ public class PhysicalVertex {
@SuppressWarnings("checkstyle:MagicNumber")
// This method must not throw an exception
public void deploy(@NonNull Address address) {
-
- TaskGroupImmutableInformation taskGroupImmutableInformation =
- new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
- nodeEngine.getSerializationService().toData(this.taskGroup),
- this.pluginJarsUrls);
-
- try {
- waitForCompleteByExecutionService = new NonCompletableFuture<>(
- nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
- address)
- .invoke());
- } catch (Throwable th) {
- LOGGER.severe(String.format("%s deploy error with Exception: %s",
- this.taskFullName,
- ExceptionUtils.getMessage(th)));
- updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED);
- taskFuture.complete(
- new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, null));
- }
+ /**
+ TaskGroupImmutableInformation taskGroupImmutableInformation =
+ new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
+ nodeEngine.getSerializationService().toData(this.taskGroup),
+ this.pluginJarsUrls);
+
+ try {
+ waitForCompleteByExecutionService = new NonCompletableFuture<>(
+ nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+ new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+ address)
+ .invoke());
+ } catch (Throwable th) {
+ LOGGER.severe(String.format("%s deploy error with Exception: %s",
+ this.taskFullName,
+ ExceptionUtils.getMessage(th)));
+ updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED);
+ taskFuture.complete(
+ new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, null));
+ }*/
+
+ waitForCompleteByExecutionService = new PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return new TaskExecutionState(flakeIdGenerator.newId(), ExecutionState.FINISHED, null);
+ }));
updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
waitForCompleteByExecutionService.whenComplete((v, t) -> {
@@ -176,13 +182,13 @@ public class PhysicalVertex {
updateTaskState(executionState.get(), v.getExecutionState());
if (v.getThrowable() != null) {
LOGGER.severe(String.format("%s end with state %s and Exception: %s",
- this.taskFullName,
- v.getExecutionState(),
- ExceptionUtils.getMessage(v.getThrowable())));
+ this.taskFullName,
+ v.getExecutionState(),
+ ExceptionUtils.getMessage(v.getThrowable())));
} else {
LOGGER.severe(String.format("%s end with state %s",
- this.taskFullName,
- v.getExecutionState()));
+ this.taskFullName,
+ v.getExecutionState()));
}
taskFuture.complete(v);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index a044cc7b8..2a7c06689 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineState;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -74,7 +74,7 @@ public class SubPlan {
* This future only can completion by the {@link PhysicalVertex } taskFuture.
* When the taskFuture in {@link PhysicalVertex} completed, The NonCompletableFuture's whenComplete method will be called
*/
- private final NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex;
+ private final PassiveCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex;
public SubPlan(int pipelineIndex,
int totalPipelineNum,
@@ -82,7 +82,7 @@ public class SubPlan {
@NonNull List<PhysicalVertex> physicalVertexList,
@NonNull List<PhysicalVertex> coordinatorVertexList,
@NonNull CompletableFuture<PipelineState> pipelineFuture,
- @NonNull NonCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex,
+ @NonNull PassiveCompletableFuture<TaskExecutionState>[] waitForCompleteByPhysicalVertex,
@NonNull JobImmutableInformation jobImmutableInformation) {
this.pipelineIndex = pipelineIndex;
this.pipelineFuture = pipelineFuture;
@@ -184,14 +184,14 @@ public class SubPlan {
}
}
- public NonCompletableFuture<Void> cancelPipeline() {
+ public PassiveCompletableFuture<Void> cancelPipeline() {
CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
// TODO Implement cancel tasks in pipeline.
return null;
});
cancelFuture.complete(null);
- return new NonCompletableFuture<>(cancelFuture);
+ return new PassiveCompletableFuture<>(cancelFuture);
}
public void failedWithNoEnoughResource() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index cfb3352bc..1cdc0b5f7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.engine.server.master;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -45,7 +45,7 @@ public class JobMaster implements Runnable {
private LogicalDag logicalDag;
private PhysicalPlan physicalPlan;
- private final Data jobImmutableInformation;
+ private final Data jobImmutableInformationData;
private final NodeEngine nodeEngine;
@@ -57,10 +57,12 @@ public class JobMaster implements Runnable {
private CompletableFuture<JobStatus> jobMasterCompleteFuture = new CompletableFuture<>();
- public JobMaster(@NonNull Data jobImmutableInformation,
+ private JobImmutableInformation jobImmutableInformation;
+
+ public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService) {
- this.jobImmutableInformation = jobImmutableInformation;
+ this.jobImmutableInformationData = jobImmutableInformationData;
this.nodeEngine = nodeEngine;
this.executorService = executorService;
flakeIdGenerator =
@@ -70,15 +72,17 @@ public class JobMaster implements Runnable {
}
public void init() throws Exception {
- JobImmutableInformation jobInformation = nodeEngine.getSerializationService().toObject(jobImmutableInformation);
- LOGGER.info("Job [" + jobInformation.getJobId() + "] submit");
- LOGGER.info("Job [" + jobInformation.getJobId() + "] jar urls " + jobInformation.getPluginJarsUrls());
+ jobImmutableInformation = nodeEngine.getSerializationService().toObject(
+ jobImmutableInformationData);
+ LOGGER.info("Job [" + jobImmutableInformation.getJobId() + "] submit");
+ LOGGER.info(
+ "Job [" + jobImmutableInformation.getJobId() + "] jar urls " + jobImmutableInformation.getPluginJarsUrls());
// TODO Use classloader load the connector jars and deserialize logicalDag
- this.logicalDag = nodeEngine.getSerializationService().toObject(jobInformation.getLogicalDag());
+ this.logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag,
nodeEngine,
- jobInformation,
+ jobImmutableInformation,
System.currentTimeMillis(),
executorService,
flakeIdGenerator);
@@ -88,9 +92,10 @@ public class JobMaster implements Runnable {
@Override
public void run() {
try {
- NonCompletableFuture<JobStatus> jobStatusNonCompletableFuture = physicalPlan.getJobEndCompletableFuture();
+ PassiveCompletableFuture<JobStatus> jobStatusPassiveCompletableFuture =
+ physicalPlan.getJobEndCompletableFuture();
- jobStatusNonCompletableFuture.whenComplete((v, t) -> {
+ jobStatusPassiveCompletableFuture.whenComplete((v, t) -> {
// We need not handle t, Because we will not return t from physicalPlan
if (JobStatus.FAILING.equals(v)) {
cleanJob();
@@ -120,4 +125,12 @@ public class JobMaster implements Runnable {
public ResourceManager getResourceManager() {
return resourceManager;
}
+
+ public PassiveCompletableFuture<JobStatus> getJobMasterCompleteFuture() {
+ return new PassiveCompletableFuture<>(jobMasterCompleteFuture);
+ }
+
+ public JobImmutableInformation getJobImmutableInformation() {
+ return jobImmutableInformation;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AbstractJobAsyncOperation.java
similarity index 57%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AbstractJobAsyncOperation.java
index d66a5d66c..d27b62a28 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AbstractJobAsyncOperation.java
@@ -17,43 +17,30 @@
package org.apache.seatunnel.engine.server.operation;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
-import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
-
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import java.io.IOException;
-public class CheckpointTriggerOperation extends AsyncOperation {
- private CheckpointBarrier checkpointBarrier;
-
- public CheckpointTriggerOperation() {
- }
+public abstract class AbstractJobAsyncOperation extends AsyncOperation {
+ protected long jobId;
- public CheckpointTriggerOperation(CheckpointBarrier checkpointBarrier) {
- this.checkpointBarrier = checkpointBarrier;
+ public AbstractJobAsyncOperation() {
}
- @Override
- public int getClassId() {
- return OperationDataSerializerHook.SUBMIT_OPERATOR;
+ public AbstractJobAsyncOperation(long jobId) {
+ this.jobId = jobId;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
- out.writeObject(checkpointBarrier);
+ super.writeInternal(out);
+ out.writeLong(jobId);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
- checkpointBarrier = in.readObject(CheckpointBarrier.class);
- }
-
- @Override
- protected NonCompletableFuture<?> doRun() throws Exception {
- // TODO: All source Vertexes executed
- return null;
+ super.readInternal(in);
+ jobId = in.readLong();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
index 543197559..56701e88b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
@@ -23,7 +23,7 @@ import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
import static com.hazelcast.spi.impl.operationservice.ExceptionAction.THROW_EXCEPTION;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
@@ -47,7 +47,7 @@ public abstract class AsyncOperation extends Operation implements IdentifiedData
@Override
public final void run() {
- NonCompletableFuture<?> future;
+ PassiveCompletableFuture<?> future;
try {
future = doRun();
} catch (Exception e) {
@@ -58,7 +58,7 @@ public abstract class AsyncOperation extends Operation implements IdentifiedData
future.whenComplete(withTryCatch(getLogger(), (r, f) -> doSendResponse(f != null ? peel(f) : r)));
}
- protected abstract NonCompletableFuture<?> doRun() throws Exception;
+ protected abstract PassiveCompletableFuture<?> doRun() throws Exception;
@Override
public final boolean returnsResponse() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
index d66a5d66c..6de6b3f75 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.server.operation;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
@@ -52,7 +52,7 @@ public class CheckpointTriggerOperation extends AsyncOperation {
}
@Override
- protected NonCompletableFuture<?> doRun() throws Exception {
+ protected PassiveCompletableFuture<?> doRun() throws Exception {
// TODO: All source Vertexes executed
return null;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
index d658bd398..8a36d9ea4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.engine.server.operation;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
import com.hazelcast.internal.nio.IOUtil;
@@ -40,9 +40,9 @@ public class DeployTaskOperation extends AsyncOperation {
}
@Override
- protected NonCompletableFuture<?> doRun() throws Exception {
- SeaTunnelServer server = getService();
- return server.getTaskExecutionService().deployTask(taskImmutableInformation);
+ protected PassiveCompletableFuture<?> doRun() throws Exception {
+ TaskExecutionService taskExecutionService = getService();
+ return taskExecutionService.deployTask(taskImmutableInformation);
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
index 419712111..5c1e5b0fc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/SubmitJobOperation.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.server.operation;
-import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
@@ -29,13 +29,14 @@ import lombok.NonNull;
import java.io.IOException;
-public class SubmitJobOperation extends AsyncOperation {
+public class SubmitJobOperation extends AbstractJobAsyncOperation {
private Data jobImmutableInformation;
public SubmitJobOperation() {
}
- public SubmitJobOperation(@NonNull Data jobImmutableInformation) {
+ public SubmitJobOperation(long jobId, @NonNull Data jobImmutableInformation) {
+ super(jobId);
this.jobImmutableInformation = jobImmutableInformation;
}
@@ -57,9 +58,8 @@ public class SubmitJobOperation extends AsyncOperation {
}
@Override
- protected NonCompletableFuture<?> doRun() throws Exception {
+ protected PassiveCompletableFuture<?> doRun() throws Exception {
SeaTunnelServer seaTunnelServer = getService();
- NonCompletableFuture<Void> voidCompletableFuture = seaTunnelServer.submitJob(jobImmutableInformation);
- return voidCompletableFuture;
+ return seaTunnelServer.submitJob(jobId, jobImmutableInformation);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
similarity index 51%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
index 5eb391b31..65724c3c0 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobClient.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/WaitForJobCompleteOperation.java
@@ -15,25 +15,30 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.server.operation;
-import org.apache.seatunnel.engine.common.Constant;
-import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
-import lombok.NonNull;
+public class WaitForJobCompleteOperation extends AbstractJobAsyncOperation {
-public class JobClient {
- private SeaTunnelHazelcastClient hazelcastClient;
+ public WaitForJobCompleteOperation() {
+ super();
+ }
- public JobClient(@NonNull SeaTunnelHazelcastClient hazelcastClient) {
- this.hazelcastClient = hazelcastClient;
+ public WaitForJobCompleteOperation(long jobId) {
+ super(jobId);
}
- public long getNewJobId() {
- return hazelcastClient.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+ @Override
+ protected PassiveCompletableFuture<?> doRun() throws Exception {
+ SeaTunnelServer service = getService();
+ return service.waitForJobComplete(jobId);
}
- public JobProxy createJobProxy(@NonNull JobImmutableInformation jobImmutableInformation) {
- return new JobProxy(hazelcastClient, jobImmutableInformation);
+ @Override
+ public int getClassId() {
+ return OperationDataSerializerHook.WAIT_FORM_JOB_COMPLETE_OPERATOR;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index 2b6c78488..9876f489e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.protocol.task;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
import com.hazelcast.client.impl.protocol.MessageTaskFactory;
import com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider;
@@ -42,7 +43,11 @@ public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryPr
}
private void initFactories() {
- factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new PrintMessageTask(clientMessage, node, connection));
- factories.put(SeaTunnelSubmitJobCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));
+ factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE,
+ (clientMessage, connection) -> new PrintMessageTask(clientMessage, node, connection));
+ factories.put(SeaTunnelSubmitJobCodec.REQUEST_MESSAGE_TYPE,
+ (clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));
+ factories.put(SeaTunnelWaitForJobCompleteCodec.REQUEST_MESSAGE_TYPE,
+ (clientMessage, connection) -> new WaitForJobCompleteTask(clientMessage, node, connection));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
index 5cb228b3e..68ec6b69c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
@@ -23,10 +23,9 @@ import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.nio.Connection;
-import com.hazelcast.internal.serialization.Data;
import com.hazelcast.spi.impl.operationservice.Operation;
-public class SubmitJobTask extends AbstractSeaTunnelMessageTask<Data, Void> {
+public class SubmitJobTask extends AbstractSeaTunnelMessageTask<SeaTunnelSubmitJobCodec.RequestParameters, Void> {
protected SubmitJobTask(ClientMessage clientMessage, Node node, Connection connection) {
super(clientMessage, node, connection,
@@ -36,7 +35,7 @@ public class SubmitJobTask extends AbstractSeaTunnelMessageTask<Data, Void> {
@Override
protected Operation prepareOperation() {
- return new SubmitJobOperation(parameters);
+ return new SubmitJobOperation(parameters.jobId, parameters.jobImmutableInformation);
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
similarity index 68%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
index 5cb228b3e..813063b19 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SubmitJobTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/WaitForJobCompleteTask.java
@@ -17,35 +17,34 @@
package org.apache.seatunnel.engine.server.protocol.task;
-import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
-import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
+import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.nio.Connection;
-import com.hazelcast.internal.serialization.Data;
import com.hazelcast.spi.impl.operationservice.Operation;
-public class SubmitJobTask extends AbstractSeaTunnelMessageTask<Data, Void> {
-
- protected SubmitJobTask(ClientMessage clientMessage, Node node, Connection connection) {
+public class WaitForJobCompleteTask extends AbstractSeaTunnelMessageTask<Long, JobStatus> {
+ protected WaitForJobCompleteTask(ClientMessage clientMessage, Node node, Connection connection) {
super(clientMessage, node, connection,
- SeaTunnelSubmitJobCodec::decodeRequest,
- x -> SeaTunnelSubmitJobCodec.encodeResponse());
+ SeaTunnelWaitForJobCompleteCodec::decodeRequest,
+ x -> SeaTunnelWaitForJobCompleteCodec.encodeResponse(x.ordinal()));
}
@Override
protected Operation prepareOperation() {
- return new SubmitJobOperation(parameters);
+ return new WaitForJobCompleteOperation(parameters);
}
@Override
public String getMethodName() {
- return "submitJob";
+ return "waitForJobComplete";
}
@Override
public Object[] getParameters() {
- return new Object[]{};
+ return new Object[0];
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 575b4bfa1..d5ade4d59 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.server.operation.CheckpointTriggerOperation;
import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
+import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -39,8 +40,9 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
public static final int PRINT_MESSAGE_OPERATOR = 0;
public static final int SUBMIT_OPERATOR = 1;
public static final int DEPLOY_TASK_OPERATOR = 2;
+ public static final int WAIT_FORM_JOB_COMPLETE_OPERATOR = 3;
- public static final int CHECKPOINT_TRIGGER_OPERATOR = 3;
+ public static final int CHECKPOINT_TRIGGER_OPERATOR = 4;
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -68,6 +70,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
return new SubmitJobOperation();
case DEPLOY_TASK_OPERATOR:
return new DeployTaskOperation();
+ case WAIT_FORM_JOB_COMPLETE_OPERATOR:
+ return new WaitForJobCompleteOperation();
case CHECKPOINT_TRIGGER_OPERATOR:
return new CheckpointTriggerOperation();
default:
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 17c004ff2..f01dcc218 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -114,7 +115,11 @@ public class TaskTest {
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
- service.submitJob(nodeEngine.getSerializationService().toData(jobImmutableInformation));
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ service.submitJob(jobImmutableInformation.getJobId(),
+ nodeEngine.getSerializationService().toData(jobImmutableInformation));
+
+ Assert.assertNotNull(voidPassiveCompletableFuture);
}
@Test