You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/24 01:56:22 UTC

[incubator-seatunnel] branch 2.3.0-beta-release updated: [engine][checkpoint] add barrier flow operation (#3158)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch 2.3.0-beta-release
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/2.3.0-beta-release by this push:
     new 28554418a [engine][checkpoint] add barrier flow operation (#3158)
     new 2a8c33111 Merge branch '2.3.0-beta-release' of https://github.com/apache/incubator-seatunnel into 2.3.0-beta-release
28554418a is described below

commit 28554418a80577b22afdcfdc9d8f465b227a9f80
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Sat Oct 22 10:39:23 2022 +0800

    [engine][checkpoint] add barrier flow operation (#3158)
---
 .../serializable/TaskDataSerializerHook.java       |  5 ++
 .../server/task/SourceSplitEnumeratorTask.java     |  4 +-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  4 +-
 .../BarrierFlowOperation.java}                     | 67 ++++++++++++----------
 .../operation/sink/SinkPrepareCommitOperation.java |  4 +-
 5 files changed, 48 insertions(+), 36 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 19c2d7114..2161c6cbe 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
 import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
 import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
 import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
+import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
 import org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequestOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
@@ -69,6 +70,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
 
     public static final int NOTIFY_TASK_STATUS_OPERATOR = 15;
 
+    public static final int BARRIER_FLOW_OPERATOR = 16;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
         SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -119,6 +122,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
                     return new RestoredSplitOperation();
                 case NOTIFY_TASK_STATUS_OPERATOR:
                     return new NotifyTaskStatusOperation();
+                case BARRIER_FLOW_OPERATOR:
+                    return new BarrierFlowOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 1f8163e3f..be5927f10 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -32,11 +32,11 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
-import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
 import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
+import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
@@ -134,7 +134,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
             if (barrier.snapshot()) {
                 snapshotState = enumerator.snapshotState(barrierId);
             }
-            sendToAllReader(location -> new CheckpointBarrierTriggerOperation(barrier, location));
+            sendToAllReader(location -> new BarrierFlowOperation(barrier, location));
         }
         if (barrier.snapshot()) {
             byte[] serialize = enumeratorStateSerializer.serialize(snapshotState);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 6b764aea0..eeba5588b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -28,11 +28,11 @@ import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
-import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
 import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
+import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
 import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
@@ -142,7 +142,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
                     }
                 } else {
                     if (containAggCommitter) {
-                        runningTask.getExecutionContext().sendToMember(new CheckpointBarrierTriggerOperation(barrier, committerTaskLocation), committerTaskAddress);
+                        runningTask.getExecutionContext().sendToMember(new BarrierFlowOperation(barrier, committerTaskLocation), committerTaskAddress);
                     }
                 }
                 runningTask.ack(barrier);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
similarity index 53%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
index f206071f9..8e7a24ca4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
@@ -15,15 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.operation.sink;
+package org.apache.seatunnel.engine.server.task.operation.checkpoint;
 
-import org.apache.seatunnel.common.utils.SerializationUtils;
+import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
+
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.TaskExecutionService;
-import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
+import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
 import com.hazelcast.nio.ObjectDataInput;
@@ -33,47 +35,52 @@ import lombok.NoArgsConstructor;
 import java.io.IOException;
 
 @NoArgsConstructor
-public class SinkPrepareCommitOperation extends CheckpointBarrierTriggerOperation {
-    private byte[] commitInfos;
+public class BarrierFlowOperation extends TaskOperation {
+    protected Barrier barrier;
 
-    public SinkPrepareCommitOperation(Barrier checkpointBarrier, TaskLocation taskLocation, byte[] commitInfos) {
-        super(checkpointBarrier, taskLocation);
-        this.commitInfos = commitInfos;
+    public BarrierFlowOperation(Barrier barrier, TaskLocation taskLocation) {
+        super(taskLocation);
+        this.barrier = barrier;
     }
 
     @Override
-    protected void writeInternal(ObjectDataOutput out) throws IOException {
-        super.writeInternal(out);
-        out.writeByteArray(commitInfos);
-    }
-
-    @Override
-    protected void readInternal(ObjectDataInput in) throws IOException {
-        super.readInternal(in);
-        commitInfos = in.readByteArray();
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
     }
 
     @Override
-    public String getServiceName() {
-        return SeaTunnelServer.SERVICE_NAME;
+    public int getClassId() {
+        return TaskDataSerializerHook.BARRIER_FLOW_OPERATOR;
     }
 
     @Override
-    public int getFactoryId() {
-        return TaskDataSerializerHook.FACTORY_ID;
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeObject(barrier);
     }
 
     @Override
-    public int getClassId() {
-        return TaskDataSerializerHook.SINK_PREPARE_COMMIT_TYPE;
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        // TODO: support another barrier
+        barrier = in.readObject();
     }
 
     @Override
     public void run() throws Exception {
-        TaskExecutionService taskExecutionService = ((SeaTunnelServer) getService()).getTaskExecutionService();
-        SinkAggregatedCommitterTask<?, ?> committerTask = taskExecutionService.getTask(taskLocation);
-        ClassLoader classLoader = taskExecutionService.getExecutionContext(taskLocation.getTaskGroupLocation()).getClassLoader();
-        committerTask.receivedWriterCommitInfo(barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader));
-        committerTask.triggerBarrier(barrier);
+        SeaTunnelServer server = getService();
+        RetryUtils.retryWithException(() -> {
+            Task task = server.getTaskExecutionService()
+                .getExecutionContext(taskLocation.getTaskGroupLocation()).getTaskGroup()
+                .getTask(taskLocation.getTaskID());
+            try {
+                task.triggerBarrier(barrier);
+            } catch (Exception e) {
+                sneakyThrow(e);
+            }
+            return null;
+        }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(taskLocation.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
index f206071f9..2b26b482b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
@@ -20,10 +20,10 @@ package org.apache.seatunnel.engine.server.task.operation.sink;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.TaskExecutionService;
-import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
 import com.hazelcast.nio.ObjectDataInput;
@@ -33,7 +33,7 @@ import lombok.NoArgsConstructor;
 import java.io.IOException;
 
 @NoArgsConstructor
-public class SinkPrepareCommitOperation extends CheckpointBarrierTriggerOperation {
+public class SinkPrepareCommitOperation extends BarrierFlowOperation {
     private byte[] commitInfos;
 
     public SinkPrepareCommitOperation(Barrier checkpointBarrier, TaskLocation taskLocation, byte[] commitInfos) {