You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/31 04:53:42 UTC
[incubator-seatunnel] branch dev updated: [Bug][Engine Server] fix checkpoint stuck error (#3213)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a66923a93 [Bug][Engine Server] fix checkpoint stuck error (#3213)
a66923a93 is described below
commit a66923a93a6b0d4fa6543991cc7ad990b31c3666
Author: Eric <ga...@gmail.com>
AuthorDate: Mon Oct 31 12:53:35 2022 +0800
[Bug][Engine Server] fix checkpoint stuck error (#3213)
* add debug logs to PipelineBaseScheduler
* [Engine] [Log] add apply resource log
* [Engine] [Resource] fix resource active check bug
* [Engine] [Test] Add operation retry count
---
.../src/test/resources/log4j.properties | 1 +
.../apache/seatunnel/engine/common/Constant.java | 2 +-
.../server/checkpoint/CheckpointCoordinator.java | 1 +
.../engine/server/execution/TaskGroupLocation.java | 9 +++++
.../resourcemanager/AbstractResourceManager.java | 15 ++++++--
.../server/scheduler/PipelineBaseScheduler.java | 43 ++++++++++++----------
6 files changed, 47 insertions(+), 24 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties
index c4398d09b..8757e3b86 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties
@@ -22,3 +22,4 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
log4j.logger.org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator=DEBUG
+log4j.logger.org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler=DEBUG
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 37040a6ef..f7534884e 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -36,7 +36,7 @@ public class Constant {
public static final String HAZELCAST_SEATUNNEL_DEFAULT_YAML = "seatunnel.yaml";
- public static final int OPERATION_RETRY_TIME = 5;
+ public static final int OPERATION_RETRY_TIME = 10;
public static final int OPERATION_RETRY_SLEEP = 2000;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index dd82f64c2..fc2ec8a15 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -381,6 +381,7 @@ public class CheckpointCoordinator {
);
// TODO: clear related future & scheduler task
pendingCheckpoints.clear();
+ pendingCounter.set(0);
scheduler.shutdownNow();
scheduler = Executors.newScheduledThreadPool(
1, runnable -> {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
index 250846af5..ef453cf1a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
@@ -52,4 +52,13 @@ public class TaskGroupLocation implements Serializable {
public int hashCode() {
return new HashCodeBuilder(17, 37).append(jobId).append(pipelineId).append(taskGroupId).toHashCode();
}
+
+ @Override
+ public String toString() {
+ return "TaskGroupLocation{" +
+ "jobId=" + jobId +
+ ", pipelineId=" + pipelineId +
+ ", taskGroupId=" + taskGroupId +
+ '}';
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index e13c02807..3ad2c5dc2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -170,9 +170,18 @@ public abstract class AbstractResourceManager implements ResourceManager {
@Override
public boolean slotActiveCheck(SlotProfile profile) {
- return registerWorker.values().stream()
- .flatMap(workerProfile -> Arrays.stream(workerProfile.getAssignedSlots()))
- .anyMatch(s -> s.getSlotID() == profile.getSlotID());
+ boolean active = false;
+ if (registerWorker.containsKey(profile.getWorker())) {
+ active = Arrays.stream(registerWorker.get(profile.getWorker()).getAssignedSlots())
+ .allMatch(s -> s.getSlotID() == profile.getSlotID());
+ }
+
+ if (!active) {
+ LOGGER.info("received slot active check failed, profile: " + profile);
+ } else {
+ LOGGER.fine("received slot active check success, profile: " + profile);
+ }
+ return active;
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index c0e95bf0c..7fdfe323f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -31,9 +31,8 @@ import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
@@ -43,8 +42,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+@Slf4j
public class PipelineBaseScheduler implements JobScheduler {
- private static final ILogger LOGGER = Logger.getLogger(PipelineBaseScheduler.class);
private final PhysicalPlan physicalPlan;
private final long jobId;
@@ -91,6 +90,8 @@ public class PipelineBaseScheduler implements JobScheduler {
Map<TaskGroupLocation, SlotProfile> slotProfiles =
getOrApplyResourceForPipeline(pipeline, jobMaster.getOwnedSlotProfiles(pipeline.getPipelineLocation()));
+ log.debug("slotProfiles: {}", slotProfiles);
+
// To ensure release pipeline resource after new master node active, we need store slotProfiles first and then deploy tasks.
jobMaster.setOwnedSlotProfiles(pipeline.getPipelineLocation(), slotProfiles);
// deploy pipeline
@@ -125,14 +126,22 @@ public class PipelineBaseScheduler implements JobScheduler {
private SlotProfile getOrApplyResourceForTask(@NonNull PhysicalVertex task,
Map<TaskGroupLocation, SlotProfile> ownedSlotProfiles) {
+ SlotProfile oldProfile;
if (ownedSlotProfiles == null
|| ownedSlotProfiles.isEmpty()
- || ownedSlotProfiles.get(task.getTaskGroupLocation()) == null
- || !resourceManager.slotActiveCheck(ownedSlotProfiles.get(task.getTaskGroupLocation()))) {
- return applyResourceForTask(task).join();
+ || ownedSlotProfiles.get(task.getTaskGroupLocation()) == null) {
+ oldProfile = null;
+ } else {
+ oldProfile = ownedSlotProfiles.get(task.getTaskGroupLocation());
+ }
+ if (oldProfile == null || !resourceManager.slotActiveCheck(oldProfile)) {
+ SlotProfile newProfile = applyResourceForTask(task).join();
+ log.info(String.format("use new profile: %s to replace not active profile: %s for task %s", newProfile, oldProfile, task));
+ return newProfile;
}
+ log.info(String.format("use active old profile: %s for task %s", oldProfile, task));
task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
- return ownedSlotProfiles.get(task.getTaskGroupLocation());
+ return oldProfile;
}
private Map<TaskGroupLocation, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) {
@@ -160,9 +169,7 @@ public class PipelineBaseScheduler implements JobScheduler {
return resourceManager.applyResource(jobId, new ResourceProfile());
} else if (ExecutionState.CANCELING.equals(task.getExecutionState()) ||
ExecutionState.CANCELED.equals(task.getExecutionState())) {
- LOGGER.info(
- String.format("%s be canceled, skip %s this task.", task.getTaskFullName(),
- ExecutionState.SCHEDULED));
+ log.info("{} be canceled, skip {} this task.", task.getTaskFullName(), ExecutionState.SCHEDULED);
return null;
} else {
makeTaskFailed(task.getTaskGroupLocation(),
@@ -184,8 +191,7 @@ public class PipelineBaseScheduler implements JobScheduler {
});
} else if (ExecutionState.CANCELING.equals(task.getExecutionState()) ||
ExecutionState.CANCELED.equals(task.getExecutionState())) {
- LOGGER.info(
- String.format("%s be canceled, skip %s this task.", task.getTaskFullName(), ExecutionState.DEPLOYING));
+ log.info("{} be canceled, skip {} this task.", task.getTaskFullName(), ExecutionState.DEPLOYING);
return null;
} else {
jobMaster.updateTaskExecutionState(
@@ -218,9 +224,7 @@ public class PipelineBaseScheduler implements JobScheduler {
deployCoordinatorFuture.toArray(new CompletableFuture[0]));
voidCompletableFuture.get();
if (!pipeline.updatePipelineState(PipelineStatus.DEPLOYING, PipelineStatus.RUNNING)) {
- LOGGER.info(
- String.format("%s turn to state %s, skip the running state.", pipeline.getPipelineFullName(),
- pipeline.getPipelineState()));
+ log.info("{} turn to state {}, skip the running state.", pipeline.getPipelineFullName(), pipeline.getPipelineState());
}
} catch (Exception e) {
makePipelineFailed(pipeline, e);
@@ -228,8 +232,8 @@ public class PipelineBaseScheduler implements JobScheduler {
} else if (PipelineStatus.CANCELING.equals(pipeline.getPipelineState()) ||
PipelineStatus.CANCELED.equals(pipeline.getPipelineState())) {
// may be canceled
- LOGGER.info(String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
- pipeline.getPipelineState(), PipelineStatus.DEPLOYING));
+ log.info("{} turn to state {}, skip {} this pipeline.", pipeline.getPipelineFullName(),
+ pipeline.getPipelineState(), PipelineStatus.DEPLOYING);
} else {
makePipelineFailed(pipeline, new JobException(
String.format("%s turn to a unexpected state: %s, stop scheduler job", pipeline.getPipelineFullName(),
@@ -246,9 +250,8 @@ public class PipelineBaseScheduler implements JobScheduler {
if (PipelineStatus.CANCELING.equals(pipeline.getPipelineState()) ||
PipelineStatus.CANCELED.equals(pipeline.getPipelineState())) {
// may be canceled
- LOGGER.info(
- String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
- pipeline.getPipelineState(), targetState));
+ log.info("{} turn to state {}, skip {} this pipeline.", pipeline.getPipelineFullName(),
+ pipeline.getPipelineState(), targetState);
} else {
throw new JobException(
String.format("%s turn to a unexpected state: %s, stop scheduler job",