You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/21 23:06:14 UTC
[gobblin] branch master updated: Use root cause for checking if exception is transient (#3585)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 05aa41c61 Use root cause for checking if exception is transient (#3585)
05aa41c61 is described below
commit 05aa41c61f3952362d84cfb4200ef2c361b9c748
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Fri Oct 21 16:06:09 2022 -0700
Use root cause for checking if exception is transient (#3585)
---
.../java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java | 2 +-
.../org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java | 7 ++++++-
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 6773b7cad..5f77f14cf 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -439,7 +439,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
* to avoid advancing watermarks and skipping GMCEs unnecessarily.
*/
public static boolean isExceptionTransient(Exception e, Set<String> transientExceptionMessages) {
- return transientExceptionMessages.stream().anyMatch(message -> e.getMessage().contains(message));
+ return transientExceptionMessages.stream().anyMatch(message -> Throwables.getRootCause(e).toString().contains(message));
}
/**
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
index e1c85f580..820fbe684 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
import java.util.function.BiConsumer;
import lombok.SneakyThrows;
import org.apache.gobblin.configuration.State;
@@ -217,11 +218,15 @@ public class GobblinMCEWriterTest extends PowerMockTestCase {
@Test
public void testDetectTransientException() {
- Set<String> transientExceptions = Sets.newHashSet("Filesystem closed", "Hive timeout");
+ Set<String> transientExceptions = Sets.newHashSet("Filesystem closed", "Hive timeout", "RejectedExecutionException");
IOException transientException = new IOException("test1 Filesystem closed test");
+ IOException wrapperException = new IOException("wrapper exception", transientException);
Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(transientException, transientExceptions));
+ Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(wrapperException, transientExceptions));
IOException nonTransientException = new IOException("Write failed due to bad schema");
Assert.assertFalse(GobblinMCEWriter.isExceptionTransient(nonTransientException, transientExceptions));
+ RejectedExecutionException rejectedExecutionException = new RejectedExecutionException("");
+ Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(rejectedExecutionException, transientExceptions));
}
@DataProvider(name="AllowMockMetadataWriter")