You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/22 14:37:53 UTC

[incubator-seatunnel] branch st-engine updated: [Engine][Task] Add source register timeout and retry (#2503)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new d2dbd9e79 [Engine][Task] Add source register timeout and retry (#2503)
d2dbd9e79 is described below

commit d2dbd9e7933990b2a878003e7dd705c47031a687
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Aug 22 22:37:47 2022 +0800

    [Engine][Task] Add source register timeout and retry (#2503)
    
    * [Engine][Task] Fix task running error.
    
    * [Engine][Task] Fix task running error.
---
 .../org/apache/seatunnel/common/utils/RetryUtils.java  | 18 ++++++++++++++++++
 .../task/operation/source/SourceRegisterOperation.java | 14 +++++++++++---
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
index 28ae40d5a..f6ac9a694 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
@@ -44,6 +44,8 @@ public class RetryUtils {
                     if (retryMaterial.shouldThrowException()) {
                         throw e;
                     }
+                } else if (retryMaterial.getSleepTimeMillis() > 0) {
+                    Thread.sleep(retryMaterial.getSleepTimeMillis());
                 }
             }
         } while (i <= retryTimes);
@@ -66,10 +68,21 @@ public class RetryUtils {
         // this is the exception condition, can add result condition in the future.
         private final RetryCondition<Exception> retryCondition;
 
+        /**
+         * The interval between each retry
+         */
+        private final long sleepTimeMillis;
+
         public RetryMaterial(int retryTimes, boolean shouldThrowException, RetryCondition<Exception> retryCondition) {
+            this(retryTimes, shouldThrowException, retryCondition, 0);
+        }
+
+        public RetryMaterial(int retryTimes, boolean shouldThrowException,
+                             RetryCondition<Exception> retryCondition, long sleepTimeMillis) {
             this.retryTimes = retryTimes;
             this.shouldThrowException = shouldThrowException;
             this.retryCondition = retryCondition;
+            this.sleepTimeMillis = sleepTimeMillis;
         }
 
         public int getRetryTimes() {
@@ -83,8 +96,13 @@ public class RetryUtils {
         public RetryCondition<Exception> getRetryCondition() {
             return retryCondition;
         }
+
+        public long getSleepTimeMillis() {
+            return sleepTimeMillis;
+        }
     }
 
+    @FunctionalInterface
     public interface Execution<T, E extends Exception> {
         T execute() throws E;
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index ebc26cc69..3c78206d5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task.operation.source;
 
+import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -36,6 +37,8 @@ import java.io.IOException;
  */
 public class SourceRegisterOperation extends Operation implements IdentifiedDataSerializable {
 
+    private static final int RETRY_TIME = 5;
+
     private TaskLocation readerTaskID;
     private TaskLocation enumeratorTaskID;
 
@@ -51,9 +54,14 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         Address readerAddress = getCallerAddress();
-        SourceSplitEnumeratorTask<?> task =
-                server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID()).getTaskGroup().getTask(enumeratorTaskID.getTaskID());
-        task.receivedReader(readerTaskID, readerAddress);
+        RetryUtils.retryWithException(() -> {
+            SourceSplitEnumeratorTask<?> task =
+                    server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID()).getTaskGroup().getTask(enumeratorTaskID.getTaskID());
+            task.receivedReader(readerTaskID, readerAddress);
+            return null;
+        }, new RetryUtils.RetryMaterial(RETRY_TIME, true,
+                exception -> exception instanceof NullPointerException));
+
     }
 
     @Override