You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/18 02:21:28 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][Task] change SourceSplitEnumeratorTask use state machine (#2443)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 60b7796a3 [Engine][Task] change SourceSplitEnumeratorTask use state machine (#2443)
60b7796a3 is described below

commit 60b7796a3ec63b74bf252ce45d18a95e32e2f9a2
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Aug 18 10:21:23 2022 +0800

    [Engine][Task] change SourceSplitEnumeratorTask use state machine (#2443)
    
    * [Engine][Task] change SourceSplitEnumeratorTask use state machine
    
    * [Engine][Task] change SourceSplitEnumeratorTask use state machine
---
 .../seatunnel/engine/server/task/Progress.java     |  3 +-
 .../server/task/SourceSplitEnumeratorTask.java     | 68 +++++++++++++++++-----
 .../server/task/statemachine/EnumeratorState.java  | 46 +++++++++++++++
 3 files changed, 101 insertions(+), 16 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
index b34198f86..ac23c5257 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/Progress.java
@@ -47,9 +47,8 @@ public class Progress implements IdentifiedDataSerializable, Serializable {
         madeProgress = true;
     }
 
-    public Progress done() {
+    public void done() {
         isDone = true;
-        return this;
     }
 
     public ProgressState toState() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 0c9517892..9f7d97f80 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
+import org.apache.seatunnel.engine.server.task.statemachine.EnumeratorState;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.logging.ILogger;
@@ -52,12 +53,17 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     private Map<TaskLocation, Address> taskMemberMapping;
     private Map<Long, TaskLocation> taskIDToTaskLocationMapping;
 
-    private CompletableFuture<ProgressState> future;
+    private EnumeratorState currState;
+
+    private CompletableFuture<Void> readerRegisterFuture;
+    private CompletableFuture<Void> readerFinishFuture;
 
     @Override
     public void init() throws Exception {
+        currState = EnumeratorState.INIT;
         super.init();
-        future = new CompletableFuture<>();
+        readerRegisterFuture = new CompletableFuture<>();
+        readerFinishFuture = new CompletableFuture<>();
         LOGGER.info("starting seatunnel source split enumerator task, source name: " + source.getName());
         SeaTunnelSplitEnumeratorContext<SplitT> context = new SeaTunnelSplitEnumeratorContext<>(this.source.getParallelism(), this);
         enumerator = this.source.getSource().createEnumerator(context);
@@ -73,30 +79,29 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
         if (enumerator != null) {
             enumerator.close();
         }
-        future.complete(progress.done().toState());
+        progress.done();
     }
 
     public SourceSplitEnumeratorTask(long jobID, TaskLocation taskID, SourceAction<?, SplitT, ?> source) {
         super(jobID, taskID);
         this.source = source;
+        this.currState = EnumeratorState.CREATED;
     }
 
     @NonNull
     @Override
     public ProgressState call() throws Exception {
-        if (maxReaderSize == taskMemberMapping.size()) {
-            LOGGER.info("received enough reader, starting enumerator...");
-            enumerator.run();
-            return future.get();
-        } else {
-            return progress.toState();
-        }
+        stateProcess();
+        return progress.toState();
     }
 
     public void receivedReader(TaskLocation readerId, Address memberAddr) {
         LOGGER.info("received reader register, readerID: " + readerId);
         this.addTaskMemberMapping(readerId, memberAddr);
         enumerator.registerReader((int) readerId.getTaskID());
+        if (maxReaderSize == taskMemberMapping.size()) {
+            readerRegisterFuture.complete(null);
+        }
     }
 
     public void requestSplit(long taskID) {
@@ -119,12 +124,47 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     public void readerFinished(long taskID) {
         removeTaskMemberMapping(taskID);
         if (unfinishedReaders.decrementAndGet() == 0) {
-            try {
+            readerFinishFuture.complete(null);
+        }
+    }
+
+    private void stateProcess() throws Exception {
+        switch (currState) {
+            case INIT:
+                waitReader();
+                currState = EnumeratorState.READER_REGISTER_COMPLETE;
+                break;
+            case READER_REGISTER_COMPLETE:
+                currState = EnumeratorState.ASSIGN;
+                LOGGER.info("received enough reader, starting enumerator...");
+                enumerator.run();
+                break;
+            case ASSIGN:
+                currState = EnumeratorState.WAITING_FEEDBACK;
+                break;
+            case WAITING_FEEDBACK:
+                readerFinishFuture.join();
+                currState = EnumeratorState.READER_CLOSED;
+                break;
+            case READER_CLOSED:
+                currState = EnumeratorState.CLOSED;
+                break;
+            case CLOSED:
+                this.close();
+                return;
+            // TODO support cancel by outside
+            case CANCELLING:
                 this.close();
-            } catch (Exception e) {
-                throw new TaskRuntimeException(e);
-            }
+                currState = EnumeratorState.CANCELED;
+                return;
+            default:
+                throw new IllegalArgumentException("Unknown Enumerator State: " + currState);
         }
+        stateProcess();
+    }
+
+    private void waitReader() {
+        readerRegisterFuture.join();
     }
 
     public void removeTaskMemberMapping(long taskID) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
new file mode 100644
index 000000000..483c6c62d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.statemachine;
+
+import java.io.Serializable;
+
+/**
+ * The state of {@link  org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask},
+ * The task usually start in the state {@code CREATED} and switch states according to this diagram:
+ * <p>
+ * CREATED -> INIT  -> READY -> READER_REGISTER_COMPLETE  -> ASSIGN -> WAITING_FEEDBACK -> READER_CLOSED -> CLOSED
+ * |        |          |                         |              |
+ * |        |          |                         |              |
+ * |        |          |                         |              |
+ * |        |          |                         |              |
+ * +--------+----------+-------------------------+--------------+--> CANCELLING ----> CANCELED
+ *                                                  ... -> FAILED
+ */
+public enum EnumeratorState implements Serializable {
+    CREATED,
+    INIT,
+    READY,
+    READER_REGISTER_COMPLETE,
+    ASSIGN,
+    WAITING_FEEDBACK,
+    READER_CLOSED,
+    CLOSED,
+    CANCELLING,
+    CANCELED,
+    FAILED
+}