You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/05/30 07:48:23 UTC
[seatunnel] branch dev updated: [Improve][Zeta] async execute checkpoint trigger and other block method (#4846)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 83fa97bc4 [Improve][Zeta] async execute checkpoint trigger and other block method (#4846)
83fa97bc4 is described below
commit 83fa97bc4afd61db8647602064138bcc700a00b3
Author: Jia Fan <fa...@qq.com>
AuthorDate: Tue May 30 15:48:17 2023 +0800
[Improve][Zeta] async execute checkpoint trigger and other block method (#4846)
* [Improve][Zeta] async execute checkpoint trigger
* [Bug][Zeta] Fix zeta cannot normally recycle thread belong to abnormal tasks
* [Improve][Zeta] Move `restoreState` add `addSplitsBack` execute by TaskExecuteService
* [Improve][Zeta] Move `receivedReader` execute by TaskExecuteService
---
config/hazelcast.yaml | 2 +-
.../engine/server/TaskExecutionService.java | 47 ++++++++++++++++--
.../server/checkpoint/CheckpointCloseReason.java | 3 +-
.../server/checkpoint/CheckpointCoordinator.java | 6 +++
.../server/checkpoint/CheckpointManager.java | 4 ++
.../CheckpointBarrierTriggerOperation.java | 25 ++++++----
.../operation/CheckpointErrorReportOperation.java | 56 ++++++++++++++++++++++
.../operation/NotifyTaskRestoreOperation.java | 32 +++++++++++--
.../engine/server/dag/physical/PhysicalPlan.java | 2 +-
.../seatunnel/engine/server/execution/Task.java | 4 ++
.../server/execution/TaskExecutionContext.java | 10 +++-
.../serializable/CheckpointDataSerializerHook.java | 5 ++
.../seatunnel/engine/server/task/AbstractTask.java | 1 +
.../engine/server/task/flow/SinkFlowLifeCycle.java | 3 +-
.../group/queue/IntermediateBlockingQueue.java | 2 +-
.../operation/checkpoint/BarrierFlowOperation.java | 28 +++++++----
.../operation/source/RestoredSplitOperation.java | 16 ++++++-
.../operation/source/SourceRegisterOperation.java | 23 +++++++--
18 files changed, 232 insertions(+), 37 deletions(-)
diff --git a/config/hazelcast.yaml b/config/hazelcast.yaml
index 87f607960..934712a82 100644
--- a/config/hazelcast.yaml
+++ b/config/hazelcast.yaml
@@ -37,5 +37,5 @@ hazelcast:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
- hazelcast.operation.generic.thread.count: 1000
+ hazelcast.operation.generic.thread.count: 50
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 0b95baded..3c735148b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -70,6 +70,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -122,6 +123,10 @@ public class TaskExecutionService implements DynamicMetricsProvider {
new ConcurrentHashMap<>();
private final ConcurrentMap<TaskGroupLocation, TaskGroupContext> finishedExecutionContexts =
new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<TaskGroupLocation, Map<String, CompletableFuture<?>>>
+ taskAsyncFunctionFuture = new ConcurrentHashMap<>();
+
private final ConcurrentMap<TaskGroupLocation, CompletableFuture<Void>> cancellationFutures =
new ConcurrentHashMap<>();
private final SeaTunnelConfig seaTunnelConfig;
@@ -305,7 +310,7 @@ public class TaskExecutionService implements DynamicMetricsProvider {
.peek(
task -> {
TaskExecutionContext taskExecutionContext =
- new TaskExecutionContext(task, nodeEngine);
+ new TaskExecutionContext(task, nodeEngine, this);
task.setTaskExecutionContext(taskExecutionContext);
taskExecutionContextMap.put(
task.getTaskID(), taskExecutionContext);
@@ -405,6 +410,25 @@ public class TaskExecutionService implements DynamicMetricsProvider {
}
}
+ public void asyncExecuteFunction(TaskGroupLocation taskGroupLocation, Runnable task) {
+ String id = UUID.randomUUID().toString();
+ logger.fine("accept async execute function from " + taskGroupLocation + " with id " + id);
+ if (!taskAsyncFunctionFuture.containsKey(taskGroupLocation)) {
+ taskAsyncFunctionFuture.put(taskGroupLocation, new ConcurrentHashMap<>());
+ }
+ CompletableFuture<?> future = CompletableFuture.runAsync(task, executorService);
+ taskAsyncFunctionFuture.get(taskGroupLocation).put(id, future);
+ future.whenComplete(
+ (r, e) -> {
+ taskAsyncFunctionFuture.get(taskGroupLocation).remove(id);
+ logger.fine(
+ "remove async execute function from "
+ + taskGroupLocation
+ + " with id "
+ + id);
+ });
+ }
+
public void notifyCleanTaskGroupContext(TaskGroupLocation taskGroupLocation) {
finishedExecutionContexts.remove(taskGroupLocation);
}
@@ -750,7 +774,7 @@ public class TaskExecutionService implements DynamicMetricsProvider {
"cancellationFuture should be completed exceptionally");
}
exception(e);
- cancelAllTask();
+ cancelAllTask(taskGroup.getTaskGroupLocation());
}));
}
@@ -758,13 +782,27 @@ public class TaskExecutionService implements DynamicMetricsProvider {
executionException.compareAndSet(null, t);
}
- private void cancelAllTask() {
+ private void cancelAllTask(TaskGroupLocation taskGroupLocation) {
try {
blockingFutures.forEach(f -> f.cancel(true));
currRunningTaskFuture.values().forEach(f -> f.cancel(true));
} catch (CancellationException ignore) {
// ignore
}
+ cancelAsyncFunction(taskGroupLocation);
+ }
+
+ private void cancelAsyncFunction(TaskGroupLocation taskGroupLocation) {
+ try {
+ if (taskAsyncFunctionFuture.containsKey(taskGroupLocation)) {
+ taskAsyncFunctionFuture.remove(taskGroupLocation).values().stream()
+ .filter(f -> !f.isDone())
+ .filter(f -> !f.isCancelled())
+ .forEach(f -> f.cancel(true));
+ }
+ } catch (CancellationException ignore) {
+ // ignore
+ }
}
void taskDone(Task task) {
@@ -778,6 +816,7 @@ public class TaskExecutionService implements DynamicMetricsProvider {
finishedExecutionContexts.put(
taskGroupLocation, executionContexts.remove(taskGroupLocation));
cancellationFutures.remove(taskGroupLocation);
+ cancelAsyncFunction(taskGroupLocation);
if (ex == null) {
future.complete(
new TaskExecutionState(taskGroupLocation, ExecutionState.FINISHED));
@@ -792,7 +831,7 @@ public class TaskExecutionService implements DynamicMetricsProvider {
}
}
if (!isCancel.get() && ex != null) {
- cancelAllTask();
+ cancelAllTask(taskGroupLocation);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index 7962ebc75..ae1af4d41 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -19,7 +19,8 @@ package org.apache.seatunnel.engine.server.checkpoint;
public enum CheckpointCloseReason {
PIPELINE_END("Pipeline turn to end state."),
- CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
+ CHECKPOINT_EXPIRED(
+ "Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml"),
CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 091fe99be..1b33c4c64 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
@@ -279,6 +280,11 @@ public class CheckpointCoordinator {
.toArray(InvocationFuture[]::new);
}
+ public void reportCheckpointErrorFromTask(String errorMsg) {
+ handleCoordinatorError(
+ CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR, new SeaTunnelException(errorMsg));
+ }
+
private void scheduleTriggerPendingCheckpoint(long delayMills) {
scheduleTriggerPendingCheckpoint(CHECKPOINT_TYPE, delayMills);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 48bc9a454..39d25636c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -174,6 +174,10 @@ public class CheckpointManager {
return getCheckpointCoordinator(taskLocation.getPipelineId());
}
+ public void reportCheckpointErrorFromTask(TaskLocation taskLocation, String errorMsg) {
+ getCheckpointCoordinator(taskLocation).reportCheckpointErrorFromTask(errorMsg);
+ }
+
private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
if (coordinator == null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
index cd9d54837..8ed7985fd 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
@@ -34,8 +34,6 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
-
@NoArgsConstructor
@Slf4j
public class CheckpointBarrierTriggerOperation extends TaskOperation {
@@ -79,12 +77,23 @@ public class CheckpointBarrierTriggerOperation extends TaskOperation {
.getExecutionContext(taskLocation.getTaskGroupLocation())
.getTaskGroup()
.getTask(taskLocation.getTaskID());
- try {
- log.debug("CheckpointBarrierTriggerOperation [{}]", taskLocation);
- task.triggerBarrier(barrier);
- } catch (Exception e) {
- sneakyThrow(e);
- }
+ task.getExecutionContext()
+ .getTaskExecutionService()
+ .asyncExecuteFunction(
+ taskLocation.getTaskGroupLocation(),
+ () -> {
+ try {
+ log.debug(
+ "CheckpointBarrierTriggerOperation [{}]",
+ taskLocation);
+ task.triggerBarrier(barrier);
+ } catch (Exception e) {
+ task.getExecutionContext()
+ .sendToMaster(
+ new CheckpointErrorReportOperation(
+ taskLocation, e));
+ }
+ });
return null;
},
new RetryUtils.RetryMaterial(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointErrorReportOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointErrorReportOperation.java
new file mode 100644
index 000000000..75ec924b7
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointErrorReportOperation.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint.operation;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.serializable.CheckpointDataSerializerHook;
+import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
+
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+public class CheckpointErrorReportOperation extends TaskOperation {
+
+ private String errorMsg;
+
+ public CheckpointErrorReportOperation(TaskLocation taskLocation, Throwable e) {
+ super(taskLocation);
+ this.errorMsg = ExceptionUtils.getMessage(e);
+ }
+
+ @Override
+ public void run() throws Exception {
+ SeaTunnelServer server = getService();
+ server.getCoordinatorService()
+ .getJobMaster(taskLocation.getJobId())
+ .getCheckpointManager()
+ .reportCheckpointErrorFromTask(taskLocation, errorMsg);
+ }
+
+ @Override
+ public int getFactoryId() {
+ return CheckpointDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return CheckpointDataSerializerHook.CHECKPOINT_ERROR_REPORT_OPERATOR;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
index 869252458..c50abad48 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
@@ -91,11 +91,33 @@ public class NotifyTaskRestoreOperation extends TaskOperation {
Task task = groupContext.getTaskGroup().getTask(taskLocation.getTaskID());
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(groupContext.getClassLoader());
- log.debug("NotifyTaskRestoreOperation.restoreState " + restoredState);
- task.restoreState(restoredState);
- log.debug("NotifyTaskRestoreOperation.finished " + restoredState);
- Thread.currentThread().setContextClassLoader(classLoader);
+ task.getExecutionContext()
+ .getTaskExecutionService()
+ .asyncExecuteFunction(
+ taskLocation.getTaskGroupLocation(),
+ () -> {
+ Thread.currentThread()
+ .setContextClassLoader(
+ groupContext.getClassLoader());
+ try {
+ log.debug(
+ "NotifyTaskRestoreOperation.restoreState "
+ + restoredState);
+ task.restoreState(restoredState);
+ log.debug(
+ "NotifyTaskRestoreOperation.finished "
+ + restoredState);
+ } catch (Exception e) {
+ task.getExecutionContext()
+ .sendToMaster(
+ new CheckpointErrorReportOperation(
+ taskLocation, e));
+ } finally {
+ Thread.currentThread()
+ .setContextClassLoader(classLoader);
+ }
+ });
+
} catch (Exception e) {
throw new SeaTunnelException(e);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index f8d59188b..4a81fdf63 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -174,7 +174,7 @@ public class PhysicalPlan {
}
if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
- JobStatus jobStatus = JobStatus.FAILING;
+ JobStatus jobStatus;
if (failedPipelineNum.get() > 0) {
jobStatus = JobStatus.FAILING;
updateJobState(jobStatus);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
index 58afa695d..d2b184e85 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/Task.java
@@ -49,6 +49,10 @@ public interface Task
default void setTaskExecutionContext(TaskExecutionContext taskExecutionContext) {}
+ default TaskExecutionContext getExecutionContext() {
+ return null;
+ }
+
default void triggerBarrier(Barrier barrier) throws Exception {}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 50faac151..55249babc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.execution;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.server.TaskExecutionService;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
@@ -33,10 +34,13 @@ public class TaskExecutionContext {
private final Task task;
private final NodeEngineImpl nodeEngine;
+ private final TaskExecutionService taskExecutionService;
- public TaskExecutionContext(Task task, NodeEngineImpl nodeEngine) {
+ public TaskExecutionContext(
+ Task task, NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService) {
this.task = task;
this.nodeEngine = nodeEngine;
+ this.taskExecutionService = taskExecutionService;
}
public <E> InvocationFuture<E> sendToMaster(Operation operation) {
@@ -61,6 +65,10 @@ public class TaskExecutionContext {
return (T) task;
}
+ public TaskExecutionService getTaskExecutionService() {
+ return taskExecutionService;
+ }
+
public HazelcastInstance getInstance() {
return nodeEngine.getHazelcastInstance();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java
index 4b4287641..3349a1070 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.serializable;
import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
+import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskStartOperation;
@@ -41,6 +42,8 @@ public final class CheckpointDataSerializerHook implements DataSerializerHook {
public static final int NOTIFY_TASK_RESTORE_OPERATOR = 5;
public static final int NOTIFY_TASK_START_OPERATOR = 6;
+ public static final int CHECKPOINT_ERROR_REPORT_OPERATOR = 7;
+
public static final int FACTORY_ID =
FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_CHECKPOINT_DATA_SERIALIZER_FACTORY,
@@ -73,6 +76,8 @@ public final class CheckpointDataSerializerHook implements DataSerializerHook {
return new NotifyTaskRestoreOperation();
case NOTIFY_TASK_START_OPERATOR:
return new NotifyTaskStartOperation();
+ case CHECKPOINT_ERROR_REPORT_OPERATOR:
+ return new CheckpointErrorReportOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 3129c0b94..4fc23f34c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -68,6 +68,7 @@ public abstract class AbstractTask implements Task {
this.executionContext = taskExecutionContext;
}
+ @Override
public TaskExecutionContext getExecutionContext() {
return executionContext;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 0346b7313..17860945e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -190,7 +190,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
.getExecutionContext()
.sendToMember(
new BarrierFlowOperation(barrier, committerTaskLocation),
- committerTaskAddress);
+ committerTaskAddress)
+ .join();
}
}
runningTask.ack(barrier);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
index c08150c2b..5b2de4c50 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
@@ -57,7 +57,7 @@ public class IntermediateBlockingQueue extends AbstractIntermediateQueue<Blockin
@Override
public void close() throws IOException {
- // nothing
+ getIntermediateQueue().clear();
}
private void handleRecord(Record<?> record, ConsumerWithException<Record<?>> consumer)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
index 02b224443..89d3b3805 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/BarrierFlowOperation.java
@@ -17,10 +17,10 @@
package org.apache.seatunnel.engine.server.task.operation.checkpoint;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation;
import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -35,8 +35,6 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
-
@NoArgsConstructor
@Slf4j
public class BarrierFlowOperation extends TaskOperation {
@@ -80,13 +78,23 @@ public class BarrierFlowOperation extends TaskOperation {
.getExecutionContext(taskLocation.getTaskGroupLocation())
.getTaskGroup()
.getTask(taskLocation.getTaskID());
- try {
- log.debug("BarrierFlowOperation [{}]", taskLocation);
- task.triggerBarrier(barrier);
- } catch (Exception e) {
- log.warn(ExceptionUtils.getMessage(e));
- sneakyThrow(e);
- }
+ task.getExecutionContext()
+ .getTaskExecutionService()
+ .asyncExecuteFunction(
+ taskLocation.getTaskGroupLocation(),
+ () -> {
+ try {
+ log.debug(
+ "CheckpointBarrierTriggerOperation [{}]",
+ taskLocation);
+ task.triggerBarrier(barrier);
+ } catch (Exception e) {
+ task.getExecutionContext()
+ .sendToMaster(
+ new CheckpointErrorReportOperation(
+ taskLocation, e));
+ }
+ });
return null;
},
new RetryUtils.RetryMaterial(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 65abfd7d8..0c9c3d95c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.TaskExecutionService;
+import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation;
import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -95,7 +96,20 @@ public class RestoredSplitOperation extends TaskOperation {
.collect(Collectors.toList());
SourceSplitEnumeratorTask<SourceSplit> task =
taskExecutionService.getTask(taskLocation);
- task.addSplitsBack(deserialize, subtaskIndex);
+ task.getExecutionContext()
+ .getTaskExecutionService()
+ .asyncExecuteFunction(
+ taskLocation.getTaskGroupLocation(),
+ () -> {
+ try {
+ task.addSplitsBack(deserialize, subtaskIndex);
+ } catch (Exception e) {
+ task.getExecutionContext()
+ .sendToMaster(
+ new CheckpointErrorReportOperation(
+ taskLocation, e));
+ }
+ });
return null;
},
new RetryUtils.RetryMaterial(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 7e0d5ce3a..faf44b4c4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation;
import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -60,11 +61,27 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
.getClassLoader();
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(classLoader);
SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getTask(enumeratorTaskID);
- task.receivedReader(readerTaskID, readerAddress);
- Thread.currentThread().setContextClassLoader(oldClassLoader);
+ task.getExecutionContext()
+ .getTaskExecutionService()
+ .asyncExecuteFunction(
+ enumeratorTaskID.getTaskGroupLocation(),
+ () -> {
+ try {
+ Thread.currentThread()
+ .setContextClassLoader(classLoader);
+ task.receivedReader(readerTaskID, readerAddress);
+ } catch (Exception e) {
+ task.getExecutionContext()
+ .sendToMaster(
+ new CheckpointErrorReportOperation(
+ enumeratorTaskID, e));
+ } finally {
+ Thread.currentThread()
+ .setContextClassLoader(oldClassLoader);
+ }
+ });
return null;
},
new RetryUtils.RetryMaterial(