You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/06 16:01:58 UTC

[incubator-seatunnel] branch dev updated: [hotfix][engine] fix task operation retry timeout error (#3009)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a01e4a243 [hotfix][engine] fix task operation retry timeout error (#3009)
a01e4a243 is described below

commit a01e4a2431124d791cfa1192dd41aff29dcf81e2
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Oct 7 00:01:53 2022 +0800

    [hotfix][engine] fix task operation retry timeout error (#3009)
---
 .../seatunnel/engine/server/SeaTunnelServer.java       | 18 ++++++++++++++++++
 .../operation/CheckpointBarrierTriggerOperation.java   |  3 ++-
 .../operation/CheckpointFinishedOperation.java         |  3 ++-
 .../operation/NotifyTaskRestoreOperation.java          |  3 ++-
 .../checkpoint/operation/NotifyTaskStartOperation.java |  3 ++-
 .../operation/checkpoint/CloseRequestOperation.java    |  3 ++-
 .../task/operation/source/AssignSplitOperation.java    |  3 ++-
 .../task/operation/source/RequestSplitOperation.java   |  3 ++-
 .../task/operation/source/RestoredSplitOperation.java  |  6 ++++--
 .../operation/source/SourceNoMoreElementOperation.java |  3 ++-
 .../task/operation/source/SourceRegisterOperation.java |  3 ++-
 11 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 8c1f1ebae..ead141dac 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.engine.server;
 
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
 import org.apache.seatunnel.engine.server.service.slot.SlotService;
 
@@ -30,6 +32,7 @@ import com.hazelcast.internal.services.MembershipServiceEvent;
 import com.hazelcast.jet.impl.LiveOperationRegistry;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.LiveOperations;
@@ -172,6 +175,21 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
         return taskExecutionService;
     }
 
+    /**
+     * return whether task is end
+     * @param taskGroupLocation taskGroupLocation
+     * @return
+     */
+    public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) {
+        IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap("runningJobState");
+        if (runningJobState == null) {
+            return false;
+        }
+
+        Object taskState = runningJobState.get(taskGroupLocation);
+        return taskState == null ? false : ((ExecutionState) taskState).isEndState();
+    }
+
     private void printExecutionInfo() {
         ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
         int activeCount = threadPoolExecutor.getActiveCount();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
index 52fefe1cd..9383e725a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
@@ -81,6 +81,7 @@ public class CheckpointBarrierTriggerOperation extends TaskOperation {
             }
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(taskLocation.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
index 00e82aa95..422190a7d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
@@ -89,6 +89,7 @@ public class CheckpointFinishedOperation extends TaskOperation {
             }
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(taskLocation.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
index faad04120..9bed053ff 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
@@ -88,6 +88,7 @@ public class NotifyTaskRestoreOperation extends TaskOperation {
             }
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(taskLocation.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
index 662226f9c..ace26fb92 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
@@ -53,6 +53,7 @@ public class NotifyTaskStartOperation extends TaskOperation {
             task.startCall();
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(taskLocation.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
index 3f005648e..859403b66 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
@@ -50,7 +50,8 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
             task.close();
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(readerLocation.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 9bcd5bda9..3eb20042b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -58,7 +58,8 @@ public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
             task.receivedSourceSplit(Arrays.stream(o).map(i -> (SplitT) i).collect(Collectors.toList()));
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(taskID.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index ba853833f..13536215d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -54,7 +54,8 @@ public class RequestSplitOperation extends Operation implements IdentifiedDataSe
             task.requestSplit(taskID.getTaskID());
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 5dd330ca3..30df27151 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -75,7 +75,8 @@ public class RestoredSplitOperation extends TaskOperation {
 
     @Override
     public void run() throws Exception {
-        TaskExecutionService taskExecutionService = ((SeaTunnelServer) getService()).getTaskExecutionService();
+        SeaTunnelServer server = getService();
+        TaskExecutionService taskExecutionService = server.getTaskExecutionService();
         ClassLoader classLoader = taskExecutionService.getExecutionContext(taskLocation.getTaskGroupLocation()).getClassLoader();
         List<SourceSplit> deserialize = Arrays.asList(SerializationUtils.deserialize(splits, classLoader));
         RetryUtils.retryWithException(() -> {
@@ -83,6 +84,7 @@ public class RestoredSplitOperation extends TaskOperation {
             task.addSplitsBack(deserialize, subtaskIndex);
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(taskLocation.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 0cba1c1f7..1886d7be4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -53,7 +53,8 @@ public class SourceNoMoreElementOperation extends Operation implements Identifie
             task.readerFinished(currentTaskID.getTaskID());
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()), Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index f3c88c25b..005cda41e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -61,7 +61,8 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
             task.receivedReader(readerTaskID, readerAddress);
             return null;
         }, new RetryUtils.RetryMaterial(RETRY_TIME, true,
-            exception -> exception instanceof NullPointerException, RETRY_TIME_OUT));
+            exception -> exception instanceof NullPointerException &&
+                !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()), RETRY_TIME_OUT));
     }
 
     @Override