You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/05/30 12:35:01 UTC
[seatunnel] branch dev updated: [Hotfix][Zeta] Fix IMap operation timeout bug (#4859)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 86082f952 [Hotfix][Zeta] Fix IMap operation timeout bug (#4859)
86082f952 is described below
commit 86082f952d9d7a62c6e89a9472ccad96da4fa625
Author: Eric <ga...@gmail.com>
AuthorDate: Tue May 30 20:34:55 2023 +0800
[Hotfix][Zeta] Fix IMap operation timeout bug (#4859)
---
.../seatunnel/common/utils/ExceptionUtils.java | 11 ++++++++
.../common/utils/ExceptionUtilsTest.java} | 32 ++++++++++------------
.../engine/client/job/ClientJobProxy.java | 5 ++--
.../engine/common/utils/ExceptionUtil.java | 10 +++++++
.../server/checkpoint/IMapCheckpointIDCounter.java | 4 +--
.../engine/server/dag/physical/PhysicalVertex.java | 28 +++----------------
.../engine/server/dag/physical/SubPlan.java | 24 ++++------------
7 files changed, 48 insertions(+), 66 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
index 1ee0ae53e..b8a43566f 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.common.utils;
+import lombok.NonNull;
+
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -38,4 +40,13 @@ public class ExceptionUtils {
throw new RuntimeException("Failed to print exception logs", e1);
}
}
+
+ public static Throwable getRootException(@NonNull Throwable e) {
+ Throwable cause = e.getCause();
+ if (cause != null) {
+ return getRootException(cause);
+ } else {
+ return e;
+ }
+ }
}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ExceptionUtilsTest.java
similarity index 53%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
copy to seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ExceptionUtilsTest.java
index 1ee0ae53e..d0e036f09 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java
+++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ExceptionUtilsTest.java
@@ -17,25 +17,21 @@
package org.apache.seatunnel.common.utils;
-import java.io.PrintWriter;
-import java.io.StringWriter;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-public class ExceptionUtils {
- private ExceptionUtils() {}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
- public static String getMessage(Throwable e) {
- if (e == null) {
- return "";
- }
- try (StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw)) {
- // Output the error stack information to the printWriter
- e.printStackTrace(pw);
- pw.flush();
- sw.flush();
- return sw.toString();
- } catch (Exception e1) {
- throw new RuntimeException("Failed to print exception logs", e1);
- }
+public class ExceptionUtilsTest {
+ @Test
+ public void testGetRootException() {
+ Exception exception =
+ new UnsupportedOperationException(
+ new SeaTunnelException(
+ new SeaTunnelRuntimeException(
+ CommonErrorCode.CLASS_NOT_FOUND, "class not fount")));
+ Throwable throwable = ExceptionUtils.getRootException(exception);
+ Assertions.assertTrue(throwable instanceof SeaTunnelRuntimeException);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index 9331591b5..0e877f641 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.Job;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
@@ -35,7 +36,6 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobComple
import org.apache.commons.lang3.StringUtils;
import com.hazelcast.client.impl.protocol.ClientMessage;
-import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
@@ -104,8 +104,7 @@ public class ClientJobProxy implements Job {
100000,
true,
exception ->
- exception.getCause()
- instanceof OperationTimeoutException,
+ ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
if (jobResult == null) {
throw new SeaTunnelEngineException("failed to fetch job result");
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
index e2503fcb0..e07143493 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -16,6 +16,7 @@
package org.apache.seatunnel.engine.common.utils;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
import org.apache.seatunnel.common.utils.function.RunnableWithException;
import org.apache.seatunnel.common.utils.function.SupplierWithException;
@@ -27,6 +28,8 @@ import org.apache.commons.lang3.tuple.ImmutableTriple;
import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
import com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes;
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
+import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;
import lombok.NonNull;
@@ -142,4 +145,11 @@ public final class ExceptionUtil {
// This method wouldn't be executed.
throw new RuntimeException("Never throw here.");
}
+
+ public static boolean isOperationNeedRetryException(@NonNull Throwable e) {
+ Throwable exception = ExceptionUtils.getRootException(e);
+ return exception instanceof HazelcastInstanceNotActiveException
+ || exception instanceof InterruptedException
+ || exception instanceof OperationTimeoutException;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
index ffbcea0e1..6bb679732 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
@@ -19,10 +19,10 @@ package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
-import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
@@ -56,7 +56,7 @@ public class IMapCheckpointIDCounter implements CheckpointIDCounter {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception -> exception instanceof HazelcastInstanceNotActiveException,
+ exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index ef019d610..20923cc2b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
@@ -41,8 +42,6 @@ import org.apache.commons.lang3.StringUtils;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
-import com.hazelcast.core.HazelcastInstanceNotActiveException;
-import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
@@ -366,12 +365,7 @@ public class PhysicalVertex {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception ->
- exception instanceof OperationTimeoutException
- || exception
- instanceof
- HazelcastInstanceNotActiveException
- || exception instanceof InterruptedException,
+ exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOGGER.warning(ExceptionUtils.getMessage(e));
@@ -436,11 +430,7 @@ public class PhysicalVertex {
Constant.OPERATION_RETRY_TIME,
true,
exception ->
- exception instanceof OperationTimeoutException
- || exception
- instanceof
- HazelcastInstanceNotActiveException
- || exception instanceof InterruptedException,
+ ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOGGER.warning(ExceptionUtils.getMessage(e));
@@ -478,11 +468,6 @@ public class PhysicalVertex {
} else if (ExecutionState.CANCELING.equals(runningJobStateIMap.get(taskGroupLocation))) {
noticeTaskExecutionServiceCancel();
}
-
- LOGGER.info(
- String.format(
- "can not cancel task %s because it is in state %s ",
- taskFullName, getExecutionState()));
}
@SuppressWarnings("checkstyle:MagicNumber")
@@ -567,12 +552,7 @@ public class PhysicalVertex {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception ->
- exception instanceof OperationTimeoutException
- || exception
- instanceof
- HazelcastInstanceNotActiveException
- || exception instanceof InterruptedException,
+ exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOGGER.warning(ExceptionUtils.getMessage(e));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 6a1938766..bc9e3e2aa 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
@@ -293,10 +294,7 @@ public class SubPlan {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception ->
- exception instanceof OperationTimeoutException
- || exception instanceof HazelcastInstanceNotActiveException
- || exception instanceof InterruptedException,
+ exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
}
@@ -331,11 +329,7 @@ public class SubPlan {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception ->
- exception instanceof OperationTimeoutException
- || exception
- instanceof HazelcastInstanceNotActiveException
- || exception instanceof InterruptedException,
+ exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
this.currPipelineStatus = endState;
}
@@ -390,12 +384,7 @@ public class SubPlan {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception ->
- exception instanceof OperationTimeoutException
- || exception
- instanceof
- HazelcastInstanceNotActiveException
- || exception instanceof InterruptedException,
+ exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
this.currPipelineStatus = targetState;
return true;
@@ -532,10 +521,7 @@ public class SubPlan {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception ->
- exception instanceof OperationTimeoutException
- || exception instanceof HazelcastInstanceNotActiveException
- || exception instanceof InterruptedException,
+ exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
}