You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/22 02:39:29 UTC
[incubator-seatunnel] branch dev updated: [engine][checkpoint] add barrier flow operation (#3158)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fe770627b [engine][checkpoint] add barrier flow operation (#3158)
fe770627b is described below
commit fe770627bb662b275dee8ec02268efc39e346dfd
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Sat Oct 22 10:39:23 2022 +0800
[engine][checkpoint] add barrier flow operation (#3158)
---
.../serializable/TaskDataSerializerHook.java | 5 ++
.../server/task/SourceSplitEnumeratorTask.java | 4 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 4 +-
.../BarrierFlowOperation.java} | 67 ++++++++++++----------
.../operation/sink/SinkPrepareCommitOperation.java | 4 +-
5 files changed, 48 insertions(+), 36 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 19c2d7114..2161c6cbe 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
+import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
import org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequestOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
@@ -69,6 +70,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
public static final int NOTIFY_TASK_STATUS_OPERATOR = 15;
+ public static final int BARRIER_FLOW_OPERATOR = 16;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -119,6 +122,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
return new RestoredSplitOperation();
case NOTIFY_TASK_STATUS_OPERATOR:
return new NotifyTaskStatusOperation();
+ case BARRIER_FLOW_OPERATOR:
+ return new BarrierFlowOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 1f8163e3f..be5927f10 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -32,11 +32,11 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
-import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
+import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
@@ -134,7 +134,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
if (barrier.snapshot()) {
snapshotState = enumerator.snapshotState(barrierId);
}
- sendToAllReader(location -> new CheckpointBarrierTriggerOperation(barrier, location));
+ sendToAllReader(location -> new BarrierFlowOperation(barrier, location));
}
if (barrier.snapshot()) {
byte[] serialize = enumeratorStateSerializer.serialize(snapshotState);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 6b764aea0..eeba5588b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -28,11 +28,11 @@ import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
-import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
+import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
@@ -142,7 +142,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
}
} else {
if (containAggCommitter) {
- runningTask.getExecutionContext().sendToMember(new CheckpointBarrierTriggerOperation(barrier, committerTaskLocation), committerTaskAddress);
+ runningTask.getExecutionContext().sendToMember(new BarrierFlowOperation(barrier, committerTaskLocation), committerTaskAddress);
}
}
runningTask.ack(barrier);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
similarity index 53%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
index f206071f9..8e7a24ca4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
@@ -15,15 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.operation.sink;
+package org.apache.seatunnel.engine.server.task.operation.checkpoint;
-import org.apache.seatunnel.common.utils.SerializationUtils;
+import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
+
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.TaskExecutionService;
-import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
+import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import com.hazelcast.nio.ObjectDataInput;
@@ -33,47 +35,52 @@ import lombok.NoArgsConstructor;
import java.io.IOException;
@NoArgsConstructor
-public class SinkPrepareCommitOperation extends CheckpointBarrierTriggerOperation {
- private byte[] commitInfos;
+public class BarrierFlowOperation extends TaskOperation {
+ protected Barrier barrier;
- public SinkPrepareCommitOperation(Barrier checkpointBarrier, TaskLocation taskLocation, byte[] commitInfos) {
- super(checkpointBarrier, taskLocation);
- this.commitInfos = commitInfos;
+ public BarrierFlowOperation(Barrier barrier, TaskLocation taskLocation) {
+ super(taskLocation);
+ this.barrier = barrier;
}
@Override
- protected void writeInternal(ObjectDataOutput out) throws IOException {
- super.writeInternal(out);
- out.writeByteArray(commitInfos);
- }
-
- @Override
- protected void readInternal(ObjectDataInput in) throws IOException {
- super.readInternal(in);
- commitInfos = in.readByteArray();
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
}
@Override
- public String getServiceName() {
- return SeaTunnelServer.SERVICE_NAME;
+ public int getClassId() {
+ return TaskDataSerializerHook.BARRIER_FLOW_OPERATOR;
}
@Override
- public int getFactoryId() {
- return TaskDataSerializerHook.FACTORY_ID;
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeObject(barrier);
}
@Override
- public int getClassId() {
- return TaskDataSerializerHook.SINK_PREPARE_COMMIT_TYPE;
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ // TODO: support another barrier
+ barrier = in.readObject();
}
@Override
public void run() throws Exception {
- TaskExecutionService taskExecutionService = ((SeaTunnelServer) getService()).getTaskExecutionService();
- SinkAggregatedCommitterTask<?, ?> committerTask = taskExecutionService.getTask(taskLocation);
- ClassLoader classLoader = taskExecutionService.getExecutionContext(taskLocation.getTaskGroupLocation()).getClassLoader();
- committerTask.receivedWriterCommitInfo(barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader));
- committerTask.triggerBarrier(barrier);
+ SeaTunnelServer server = getService();
+ RetryUtils.retryWithException(() -> {
+ Task task = server.getTaskExecutionService()
+ .getExecutionContext(taskLocation.getTaskGroupLocation()).getTaskGroup()
+ .getTask(taskLocation.getTaskID());
+ try {
+ task.triggerBarrier(barrier);
+ } catch (Exception e) {
+ sneakyThrow(e);
+ }
+ return null;
+ }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(taskLocation.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
index f206071f9..2b26b482b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
@@ -20,10 +20,10 @@ package org.apache.seatunnel.engine.server.task.operation.sink;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.TaskExecutionService;
-import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
+import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import com.hazelcast.nio.ObjectDataInput;
@@ -33,7 +33,7 @@ import lombok.NoArgsConstructor;
import java.io.IOException;
@NoArgsConstructor
-public class SinkPrepareCommitOperation extends CheckpointBarrierTriggerOperation {
+public class SinkPrepareCommitOperation extends BarrierFlowOperation {
private byte[] commitInfos;
public SinkPrepareCommitOperation(Barrier checkpointBarrier, TaskLocation taskLocation, byte[] commitInfos) {