You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/31 04:53:42 UTC

[incubator-seatunnel] branch dev updated: [Bug][Engine Server] fix checkpoint stuck error (#3213)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a66923a93 [Bug][Engine Server] fix checkpoint stuck error (#3213)
a66923a93 is described below

commit a66923a93a6b0d4fa6543991cc7ad990b31c3666
Author: Eric <ga...@gmail.com>
AuthorDate: Mon Oct 31 12:53:35 2022 +0800

    [Bug][Engine Server] fix checkpoint stuck error (#3213)
    
    
    * add debug logs to PipelineBaseScheduler
    
    * [Engine] [Log] add apply resource log
    
    * [Engine] [Resource] fix resource active check bug
    
    * [Engine] [Test] Add operation retry count
---
 .../src/test/resources/log4j.properties            |  1 +
 .../apache/seatunnel/engine/common/Constant.java   |  2 +-
 .../server/checkpoint/CheckpointCoordinator.java   |  1 +
 .../engine/server/execution/TaskGroupLocation.java |  9 +++++
 .../resourcemanager/AbstractResourceManager.java   | 15 ++++++--
 .../server/scheduler/PipelineBaseScheduler.java    | 43 ++++++++++++----------
 6 files changed, 47 insertions(+), 24 deletions(-)

diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties
index c4398d09b..8757e3b86 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties
@@ -22,3 +22,4 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
 
 log4j.logger.org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator=DEBUG
+log4j.logger.org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler=DEBUG
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 37040a6ef..f7534884e 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -36,7 +36,7 @@ public class Constant {
 
     public static final String HAZELCAST_SEATUNNEL_DEFAULT_YAML = "seatunnel.yaml";
 
-    public static final int OPERATION_RETRY_TIME = 5;
+    public static final int OPERATION_RETRY_TIME = 10;
 
     public static final int OPERATION_RETRY_SLEEP = 2000;
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index dd82f64c2..fc2ec8a15 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -381,6 +381,7 @@ public class CheckpointCoordinator {
             );
             // TODO: clear related future & scheduler task
             pendingCheckpoints.clear();
+            pendingCounter.set(0);
             scheduler.shutdownNow();
             scheduler = Executors.newScheduledThreadPool(
                 1, runnable -> {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
index 250846af5..ef453cf1a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
@@ -52,4 +52,13 @@ public class TaskGroupLocation implements Serializable {
     public int hashCode() {
         return new HashCodeBuilder(17, 37).append(jobId).append(pipelineId).append(taskGroupId).toHashCode();
     }
+
+    @Override
+    public String toString() {
+        return "TaskGroupLocation{" +
+            "jobId=" + jobId +
+            ", pipelineId=" + pipelineId +
+            ", taskGroupId=" + taskGroupId +
+            '}';
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index e13c02807..3ad2c5dc2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -170,9 +170,18 @@ public abstract class AbstractResourceManager implements ResourceManager {
 
     @Override
     public boolean slotActiveCheck(SlotProfile profile) {
-        return registerWorker.values().stream()
-            .flatMap(workerProfile -> Arrays.stream(workerProfile.getAssignedSlots()))
-            .anyMatch(s -> s.getSlotID() == profile.getSlotID());
+        boolean active = false;
+        if (registerWorker.containsKey(profile.getWorker())) {
+            active = Arrays.stream(registerWorker.get(profile.getWorker()).getAssignedSlots())
+                .allMatch(s -> s.getSlotID() == profile.getSlotID());
+        }
+
+        if (!active) {
+            LOGGER.info("received slot active check failed, profile: " + profile);
+        } else {
+            LOGGER.fine("received slot active check success, profile: " + profile);
+        }
+        return active;
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index c0e95bf0c..7fdfe323f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -31,9 +31,8 @@ import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
 import java.util.List;
@@ -43,8 +42,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class PipelineBaseScheduler implements JobScheduler {
-    private static final ILogger LOGGER = Logger.getLogger(PipelineBaseScheduler.class);
     private final PhysicalPlan physicalPlan;
 
     private final long jobId;
@@ -91,6 +90,8 @@ public class PipelineBaseScheduler implements JobScheduler {
             Map<TaskGroupLocation, SlotProfile> slotProfiles =
                 getOrApplyResourceForPipeline(pipeline, jobMaster.getOwnedSlotProfiles(pipeline.getPipelineLocation()));
 
+            log.debug("slotProfiles: {}", slotProfiles);
+
             // To ensure release pipeline resource after new master node active, we need store slotProfiles first and then deploy tasks.
             jobMaster.setOwnedSlotProfiles(pipeline.getPipelineLocation(), slotProfiles);
             // deploy pipeline
@@ -125,14 +126,22 @@ public class PipelineBaseScheduler implements JobScheduler {
     private SlotProfile getOrApplyResourceForTask(@NonNull PhysicalVertex task,
                                                   Map<TaskGroupLocation, SlotProfile> ownedSlotProfiles) {
 
+        SlotProfile oldProfile;
         if (ownedSlotProfiles == null
             || ownedSlotProfiles.isEmpty()
-            || ownedSlotProfiles.get(task.getTaskGroupLocation()) == null
-            || !resourceManager.slotActiveCheck(ownedSlotProfiles.get(task.getTaskGroupLocation()))) {
-            return applyResourceForTask(task).join();
+            || ownedSlotProfiles.get(task.getTaskGroupLocation()) == null) {
+            oldProfile = null;
+        } else {
+            oldProfile = ownedSlotProfiles.get(task.getTaskGroupLocation());
+        }
+        if (oldProfile == null || !resourceManager.slotActiveCheck(oldProfile)) {
+            SlotProfile newProfile = applyResourceForTask(task).join();
+            log.info(String.format("use new profile: %s to replace not active profile: %s for task %s", newProfile, oldProfile, task));
+            return newProfile;
         }
+        log.info(String.format("use active old profile: %s for task %s", oldProfile, task));
         task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
-        return ownedSlotProfiles.get(task.getTaskGroupLocation());
+        return oldProfile;
     }
 
     private Map<TaskGroupLocation, SlotProfile> applyResourceForPipeline(@NonNull SubPlan subPlan) {
@@ -160,9 +169,7 @@ public class PipelineBaseScheduler implements JobScheduler {
                 return resourceManager.applyResource(jobId, new ResourceProfile());
             } else if (ExecutionState.CANCELING.equals(task.getExecutionState()) ||
                 ExecutionState.CANCELED.equals(task.getExecutionState())) {
-                LOGGER.info(
-                    String.format("%s be canceled, skip %s this task.", task.getTaskFullName(),
-                        ExecutionState.SCHEDULED));
+                log.info("{} be canceled, skip {} this task.", task.getTaskFullName(), ExecutionState.SCHEDULED);
                 return null;
             } else {
                 makeTaskFailed(task.getTaskGroupLocation(),
@@ -184,8 +191,7 @@ public class PipelineBaseScheduler implements JobScheduler {
             });
         } else if (ExecutionState.CANCELING.equals(task.getExecutionState()) ||
             ExecutionState.CANCELED.equals(task.getExecutionState())) {
-            LOGGER.info(
-                String.format("%s be canceled, skip %s this task.", task.getTaskFullName(), ExecutionState.DEPLOYING));
+            log.info("{} be canceled, skip {} this task.", task.getTaskFullName(), ExecutionState.DEPLOYING);
             return null;
         } else {
             jobMaster.updateTaskExecutionState(
@@ -218,9 +224,7 @@ public class PipelineBaseScheduler implements JobScheduler {
                     deployCoordinatorFuture.toArray(new CompletableFuture[0]));
                 voidCompletableFuture.get();
                 if (!pipeline.updatePipelineState(PipelineStatus.DEPLOYING, PipelineStatus.RUNNING)) {
-                    LOGGER.info(
-                        String.format("%s turn to state %s, skip the running state.", pipeline.getPipelineFullName(),
-                            pipeline.getPipelineState()));
+                    log.info("{} turn to state {}, skip the running state.", pipeline.getPipelineFullName(), pipeline.getPipelineState());
                 }
             } catch (Exception e) {
                 makePipelineFailed(pipeline, e);
@@ -228,8 +232,8 @@ public class PipelineBaseScheduler implements JobScheduler {
         } else if (PipelineStatus.CANCELING.equals(pipeline.getPipelineState()) ||
             PipelineStatus.CANCELED.equals(pipeline.getPipelineState())) {
             // may be canceled
-            LOGGER.info(String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
-                pipeline.getPipelineState(), PipelineStatus.DEPLOYING));
+            log.info("{} turn to state {}, skip {} this pipeline.", pipeline.getPipelineFullName(),
+                pipeline.getPipelineState(), PipelineStatus.DEPLOYING);
         } else {
             makePipelineFailed(pipeline, new JobException(
                 String.format("%s turn to a unexpected state: %s, stop scheduler job", pipeline.getPipelineFullName(),
@@ -246,9 +250,8 @@ public class PipelineBaseScheduler implements JobScheduler {
         if (PipelineStatus.CANCELING.equals(pipeline.getPipelineState()) ||
             PipelineStatus.CANCELED.equals(pipeline.getPipelineState())) {
             // may be canceled
-            LOGGER.info(
-                String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
-                    pipeline.getPipelineState(), targetState));
+            log.info("{} turn to state {}, skip {} this pipeline.", pipeline.getPipelineFullName(),
+                pipeline.getPipelineState(), targetState);
         } else {
             throw new JobException(
                 String.format("%s turn to a unexpected state: %s, stop scheduler job",