You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/18 02:21:28 UTC
[incubator-seatunnel] branch st-engine updated: [Engine][Task] change SourceSplitEnumeratorTask use state machine (#2443)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 60b7796a3 [Engine][Task] change SourceSplitEnumeratorTask use state machine (#2443)
60b7796a3 is described below
commit 60b7796a3ec63b74bf252ce45d18a95e32e2f9a2
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Aug 18 10:21:23 2022 +0800
[Engine][Task] change SourceSplitEnumeratorTask use state machine (#2443)
* [Engine][Task] change SourceSplitEnumeratorTask use state machine
* [Engine][Task] change SourceSplitEnumeratorTask use state machine
---
.../seatunnel/engine/server/task/Progress.java | 3 +-
.../server/task/SourceSplitEnumeratorTask.java | 68 +++++++++++++++++-----
.../server/task/statemachine/EnumeratorState.java | 46 +++++++++++++++
3 files changed, 101 insertions(+), 16 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
index b34198f86..ac23c5257 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
@@ -47,9 +47,8 @@ public class Progress implements IdentifiedDataSerializable, Serializable {
madeProgress = true;
}
- public Progress done() {
+ public void done() {
isDone = true;
- return this;
}
public ProgressState toState() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 0c9517892..9f7d97f80 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
+import org.apache.seatunnel.engine.server.task.statemachine.EnumeratorState;
import com.hazelcast.cluster.Address;
import com.hazelcast.logging.ILogger;
@@ -52,12 +53,17 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
private Map<TaskLocation, Address> taskMemberMapping;
private Map<Long, TaskLocation> taskIDToTaskLocationMapping;
- private CompletableFuture<ProgressState> future;
+ private EnumeratorState currState;
+
+ private CompletableFuture<Void> readerRegisterFuture;
+ private CompletableFuture<Void> readerFinishFuture;
@Override
public void init() throws Exception {
+ currState = EnumeratorState.INIT;
super.init();
- future = new CompletableFuture<>();
+ readerRegisterFuture = new CompletableFuture<>();
+ readerFinishFuture = new CompletableFuture<>();
LOGGER.info("starting seatunnel source split enumerator task, source name: " + source.getName());
SeaTunnelSplitEnumeratorContext<SplitT> context = new SeaTunnelSplitEnumeratorContext<>(this.source.getParallelism(), this);
enumerator = this.source.getSource().createEnumerator(context);
@@ -73,30 +79,29 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
if (enumerator != null) {
enumerator.close();
}
- future.complete(progress.done().toState());
+ progress.done();
}
public SourceSplitEnumeratorTask(long jobID, TaskLocation taskID, SourceAction<?, SplitT, ?> source) {
super(jobID, taskID);
this.source = source;
+ this.currState = EnumeratorState.CREATED;
}
@NonNull
@Override
public ProgressState call() throws Exception {
- if (maxReaderSize == taskMemberMapping.size()) {
- LOGGER.info("received enough reader, starting enumerator...");
- enumerator.run();
- return future.get();
- } else {
- return progress.toState();
- }
+ stateProcess();
+ return progress.toState();
}
public void receivedReader(TaskLocation readerId, Address memberAddr) {
LOGGER.info("received reader register, readerID: " + readerId);
this.addTaskMemberMapping(readerId, memberAddr);
enumerator.registerReader((int) readerId.getTaskID());
+ if (maxReaderSize == taskMemberMapping.size()) {
+ readerRegisterFuture.complete(null);
+ }
}
public void requestSplit(long taskID) {
@@ -119,12 +124,47 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
public void readerFinished(long taskID) {
removeTaskMemberMapping(taskID);
if (unfinishedReaders.decrementAndGet() == 0) {
- try {
+ readerFinishFuture.complete(null);
+ }
+ }
+
+ private void stateProcess() throws Exception {
+ switch (currState) {
+ case INIT:
+ waitReader();
+ currState = EnumeratorState.READER_REGISTER_COMPLETE;
+ break;
+ case READER_REGISTER_COMPLETE:
+ currState = EnumeratorState.ASSIGN;
+ LOGGER.info("received enough reader, starting enumerator...");
+ enumerator.run();
+ break;
+ case ASSIGN:
+ currState = EnumeratorState.WAITING_FEEDBACK;
+ break;
+ case WAITING_FEEDBACK:
+ readerFinishFuture.join();
+ currState = EnumeratorState.READER_CLOSED;
+ break;
+ case READER_CLOSED:
+ currState = EnumeratorState.CLOSED;
+ break;
+ case CLOSED:
+ this.close();
+ return;
+ // TODO support cancel by outside
+ case CANCELLING:
this.close();
- } catch (Exception e) {
- throw new TaskRuntimeException(e);
- }
+ currState = EnumeratorState.CANCELED;
+ return;
+ default:
+ throw new IllegalArgumentException("Unknown Enumerator State: " + currState);
}
+ stateProcess();
+ }
+
+ private void waitReader() {
+ readerRegisterFuture.join();
}
public void removeTaskMemberMapping(long taskID) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
new file mode 100644
index 000000000..483c6c62d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.statemachine;
+
+import java.io.Serializable;
+
+/**
+ * The state of {@link org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask},
+ * The task usually start in the state {@code CREATED} and switch states according to this diagram:
+ * <p>
+ * CREATED -> INIT -> READY -> READER_REGISTER_COMPLETE -> ASSIGN -> WAITING_FEEDBACK -> READER_CLOSED -> CLOSED
+ * | | | | |
+ * | | | | |
+ * | | | | |
+ * | | | | |
+ * +--------+----------+-------------------------+--------------+--> CANCELLING ----> CANCELED
+ * ... -> FAILED
+ */
+public enum EnumeratorState implements Serializable {
+ CREATED,
+ INIT,
+ READY,
+ READER_REGISTER_COMPLETE,
+ ASSIGN,
+ WAITING_FEEDBACK,
+ READER_CLOSED,
+ CLOSED,
+ CANCELLING,
+ CANCELED,
+ FAILED
+}