You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/05/30 12:35:01 UTC

[seatunnel] branch dev updated: [Hotfix][Zeta] Fix IMap operation timeout bug (#4859)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 86082f952 [Hotfix][Zeta] Fix IMap operation timeout bug (#4859)
86082f952 is described below

commit 86082f952d9d7a62c6e89a9472ccad96da4fa625
Author: Eric <ga...@gmail.com>
AuthorDate: Tue May 30 20:34:55 2023 +0800

    [Hotfix][Zeta] Fix IMap operation timeout bug (#4859)
---
 .../seatunnel/common/utils/ExceptionUtils.java     | 11 ++++++++
 .../common/utils/ExceptionUtilsTest.java}          | 32 ++++++++++------------
 .../engine/client/job/ClientJobProxy.java          |  5 ++--
 .../engine/common/utils/ExceptionUtil.java         | 10 +++++++
 .../server/checkpoint/IMapCheckpointIDCounter.java |  4 +--
 .../engine/server/dag/physical/PhysicalVertex.java | 28 +++----------------
 .../engine/server/dag/physical/SubPlan.java        | 24 ++++------------
 7 files changed, 48 insertions(+), 66 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
index 1ee0ae53e..b8a43566f 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.common.utils;
 
+import lombok.NonNull;
+
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
@@ -38,4 +40,13 @@ public class ExceptionUtils {
             throw new RuntimeException("Failed to print exception logs", e1);
         }
     }
+
+    public static Throwable getRootException(@NonNull Throwable e) {
+        Throwable cause = e.getCause();
+        if (cause != null) {
+            return getRootException(cause);
+        } else {
+            return e;
+        }
+    }
 }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ExceptionUtilsTest.java
similarity index 53%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
copy to seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ExceptionUtilsTest.java
index 1ee0ae53e..d0e036f09 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
+++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ExceptionUtilsTest.java
@@ -17,25 +17,21 @@
 
 package org.apache.seatunnel.common.utils;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
-public class ExceptionUtils {
-    private ExceptionUtils() {}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-    public static String getMessage(Throwable e) {
-        if (e == null) {
-            return "";
-        }
-        try (StringWriter sw = new StringWriter();
-                PrintWriter pw = new PrintWriter(sw)) {
-            // Output the error stack information to the printWriter
-            e.printStackTrace(pw);
-            pw.flush();
-            sw.flush();
-            return sw.toString();
-        } catch (Exception e1) {
-            throw new RuntimeException("Failed to print exception logs", e1);
-        }
+public class ExceptionUtilsTest {
+    @Test
+    public void testGetRootException() {
+        Exception exception =
+                new UnsupportedOperationException(
+                        new SeaTunnelException(
+                                new SeaTunnelRuntimeException(
+                                        CommonErrorCode.CLASS_NOT_FOUND, "class not fount")));
+        Throwable throwable = ExceptionUtils.getRootException(exception);
+        Assertions.assertTrue(throwable instanceof SeaTunnelRuntimeException);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index 9331591b5..0e877f641 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.Job;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
@@ -35,7 +36,6 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobComple
 import org.apache.commons.lang3.StringUtils;
 
 import com.hazelcast.client.impl.protocol.ClientMessage;
-import com.hazelcast.core.OperationTimeoutException;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.NonNull;
@@ -104,8 +104,7 @@ public class ClientJobProxy implements Job {
                                     100000,
                                     true,
                                     exception ->
-                                            exception.getCause()
-                                                    instanceof OperationTimeoutException,
+                                            ExceptionUtil.isOperationNeedRetryException(exception),
                                     Constant.OPERATION_RETRY_SLEEP));
             if (jobResult == null) {
                 throw new SeaTunnelEngineException("failed to fetch job result");
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
index e2503fcb0..e07143493 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -16,6 +16,7 @@
 
 package org.apache.seatunnel.engine.common.utils;
 
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.function.ConsumerWithException;
 import org.apache.seatunnel.common.utils.function.RunnableWithException;
 import org.apache.seatunnel.common.utils.function.SupplierWithException;
@@ -27,6 +28,8 @@ import org.apache.commons.lang3.tuple.ImmutableTriple;
 
 import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
 import com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes;
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
+import com.hazelcast.core.OperationTimeoutException;
 import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;
 import lombok.NonNull;
 
@@ -142,4 +145,11 @@ public final class ExceptionUtil {
         // This method wouldn't be executed.
         throw new RuntimeException("Never throw here.");
     }
+
+    public static boolean isOperationNeedRetryException(@NonNull Throwable e) {
+        Throwable exception = ExceptionUtils.getRootException(e);
+        return exception instanceof HazelcastInstanceNotActiveException
+                || exception instanceof InterruptedException
+                || exception instanceof OperationTimeoutException;
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
index ffbcea0e1..6bb679732 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
@@ -19,10 +19,10 @@ package org.apache.seatunnel.engine.server.checkpoint;
 
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 
-import com.hazelcast.core.HazelcastInstanceNotActiveException;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 
@@ -56,7 +56,7 @@ public class IMapCheckpointIDCounter implements CheckpointIDCounter {
                 new RetryUtils.RetryMaterial(
                         Constant.OPERATION_RETRY_TIME,
                         true,
-                        exception -> exception instanceof HazelcastInstanceNotActiveException,
+                        exception -> ExceptionUtil.isOperationNeedRetryException(exception),
                         Constant.OPERATION_RETRY_SLEEP));
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index ef019d610..20923cc2b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.dag.physical;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -41,8 +42,6 @@ import org.apache.commons.lang3.StringUtils;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.cluster.Member;
-import com.hazelcast.core.HazelcastInstanceNotActiveException;
-import com.hazelcast.core.OperationTimeoutException;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
@@ -366,12 +365,7 @@ public class PhysicalVertex {
                         new RetryUtils.RetryMaterial(
                                 Constant.OPERATION_RETRY_TIME,
                                 true,
-                                exception ->
-                                        exception instanceof OperationTimeoutException
-                                                || exception
-                                                        instanceof
-                                                        HazelcastInstanceNotActiveException
-                                                || exception instanceof InterruptedException,
+                                exception -> ExceptionUtil.isOperationNeedRetryException(exception),
                                 Constant.OPERATION_RETRY_SLEEP));
             } catch (Exception e) {
                 LOGGER.warning(ExceptionUtils.getMessage(e));
@@ -436,11 +430,7 @@ public class PhysicalVertex {
                                     Constant.OPERATION_RETRY_TIME,
                                     true,
                                     exception ->
-                                            exception instanceof OperationTimeoutException
-                                                    || exception
-                                                            instanceof
-                                                            HazelcastInstanceNotActiveException
-                                                    || exception instanceof InterruptedException,
+                                            ExceptionUtil.isOperationNeedRetryException(exception),
                                     Constant.OPERATION_RETRY_SLEEP));
                 } catch (Exception e) {
                     LOGGER.warning(ExceptionUtils.getMessage(e));
@@ -478,11 +468,6 @@ public class PhysicalVertex {
         } else if (ExecutionState.CANCELING.equals(runningJobStateIMap.get(taskGroupLocation))) {
             noticeTaskExecutionServiceCancel();
         }
-
-        LOGGER.info(
-                String.format(
-                        "can not cancel task %s because it is in state %s ",
-                        taskFullName, getExecutionState()));
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
@@ -567,12 +552,7 @@ public class PhysicalVertex {
                         new RetryUtils.RetryMaterial(
                                 Constant.OPERATION_RETRY_TIME,
                                 true,
-                                exception ->
-                                        exception instanceof OperationTimeoutException
-                                                || exception
-                                                        instanceof
-                                                        HazelcastInstanceNotActiveException
-                                                || exception instanceof InterruptedException,
+                                exception -> ExceptionUtil.isOperationNeedRetryException(exception),
                                 Constant.OPERATION_RETRY_SLEEP));
             } catch (Exception e) {
                 LOGGER.warning(ExceptionUtils.getMessage(e));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 6a1938766..bc9e3e2aa 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.dag.physical;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
@@ -293,10 +294,7 @@ public class SubPlan {
                 new RetryUtils.RetryMaterial(
                         Constant.OPERATION_RETRY_TIME,
                         true,
-                        exception ->
-                                exception instanceof OperationTimeoutException
-                                        || exception instanceof HazelcastInstanceNotActiveException
-                                        || exception instanceof InterruptedException,
+                        exception -> ExceptionUtil.isOperationNeedRetryException(exception),
                         Constant.OPERATION_RETRY_SLEEP));
     }
 
@@ -331,11 +329,7 @@ public class SubPlan {
                     new RetryUtils.RetryMaterial(
                             Constant.OPERATION_RETRY_TIME,
                             true,
-                            exception ->
-                                    exception instanceof OperationTimeoutException
-                                            || exception
-                                                    instanceof HazelcastInstanceNotActiveException
-                                            || exception instanceof InterruptedException,
+                            exception -> ExceptionUtil.isOperationNeedRetryException(exception),
                             Constant.OPERATION_RETRY_SLEEP));
             this.currPipelineStatus = endState;
         }
@@ -390,12 +384,7 @@ public class SubPlan {
                         new RetryUtils.RetryMaterial(
                                 Constant.OPERATION_RETRY_TIME,
                                 true,
-                                exception ->
-                                        exception instanceof OperationTimeoutException
-                                                || exception
-                                                        instanceof
-                                                        HazelcastInstanceNotActiveException
-                                                || exception instanceof InterruptedException,
+                                exception -> ExceptionUtil.isOperationNeedRetryException(exception),
                                 Constant.OPERATION_RETRY_SLEEP));
                 this.currPipelineStatus = targetState;
                 return true;
@@ -532,10 +521,7 @@ public class SubPlan {
                 new RetryUtils.RetryMaterial(
                         Constant.OPERATION_RETRY_TIME,
                         true,
-                        exception ->
-                                exception instanceof OperationTimeoutException
-                                        || exception instanceof HazelcastInstanceNotActiveException
-                                        || exception instanceof InterruptedException,
+                        exception -> ExceptionUtil.isOperationNeedRetryException(exception),
                         Constant.OPERATION_RETRY_SLEEP));
     }