You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/21 23:06:14 UTC

[gobblin] branch master updated: Use root cause for checking if exception is transient (#3585)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 05aa41c61 Use root cause for checking if exception is transient (#3585)
05aa41c61 is described below

commit 05aa41c61f3952362d84cfb4200ef2c361b9c748
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Fri Oct 21 16:06:09 2022 -0700

    Use root cause for checking if exception is transient (#3585)
---
 .../java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java   | 2 +-
 .../org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java    | 7 ++++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 6773b7cad..5f77f14cf 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -439,7 +439,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
    * to avoid advancing watermarks and skipping GMCEs unnecessarily.
    */
   public static boolean isExceptionTransient(Exception e, Set<String> transientExceptionMessages) {
-    return transientExceptionMessages.stream().anyMatch(message -> e.getMessage().contains(message));
+    return transientExceptionMessages.stream().anyMatch(message -> Throwables.getRootCause(e).toString().contains(message));
   }
 
   /**
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
index e1c85f580..820fbe684 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.function.BiConsumer;
 import lombok.SneakyThrows;
 import org.apache.gobblin.configuration.State;
@@ -217,11 +218,15 @@ public class GobblinMCEWriterTest extends PowerMockTestCase {
 
   @Test
   public void testDetectTransientException() {
-    Set<String> transientExceptions = Sets.newHashSet("Filesystem closed", "Hive timeout");
+    Set<String> transientExceptions = Sets.newHashSet("Filesystem closed", "Hive timeout", "RejectedExecutionException");
     IOException transientException = new IOException("test1 Filesystem closed test");
+    IOException wrapperException = new IOException("wrapper exception", transientException);
     Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(transientException, transientExceptions));
+    Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(wrapperException, transientExceptions));
     IOException nonTransientException = new IOException("Write failed due to bad schema");
     Assert.assertFalse(GobblinMCEWriter.isExceptionTransient(nonTransientException, transientExceptions));
+    RejectedExecutionException rejectedExecutionException = new RejectedExecutionException("");
+    Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(rejectedExecutionException, transientExceptions));
   }
 
   @DataProvider(name="AllowMockMetadataWriter")