You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/13 22:47:22 UTC

[gobblin] branch master updated: Fail GMIP container for known transient exceptions to avoid data loss (#3576)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e221ed9c Fail GMIP container for known transient exceptions to avoid data loss (#3576)
5e221ed9c is described below

commit 5e221ed9c9290008e61eb87d13c3e93f606b2606
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Oct 13 15:47:15 2022 -0700

    Fail GMIP container for known transient exceptions to avoid data loss (#3576)
---
 .../gobblin/iceberg/writer/GobblinMCEWriter.java     | 20 +++++++++++++++++++-
 .../gobblin/iceberg/writer/GobblinMCEWriterTest.java | 12 ++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 04ef762bd..6773b7cad 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -43,6 +43,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Iterables;
@@ -105,6 +106,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
   public static final String HIVE_PARTITION_NAME = "hive.partition.name";
   public static final String GMCE_METADATA_WRITER_CLASSES = "gmce.metadata.writer.classes";
   public static final String GMCE_METADATA_WRITER_MAX_ERROR_DATASET = "gmce.metadata.writer.max.error.dataset";
+  public static final String TRANSIENT_EXCEPTION_MESSAGES_KEY = "gmce.metadata.writer.transient.exception.messages";
   public static final int DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 0;
   public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
   public static final String TABLE_NAME_DELIMITER = ".";
@@ -125,6 +127,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
   @Setter
   private int maxErrorDataset;
   protected EventSubmitter eventSubmitter;
+  private final Set<String> transientExceptionMessages;
 
   @AllArgsConstructor
   static class TableStatus {
@@ -157,6 +160,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
     tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
     MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags);
     eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
+    transientExceptionMessages = new HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
   }
 
   @Override
@@ -335,6 +339,9 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
         try {
           writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
         } catch (Exception e) {
+          if (isExceptionTransient(e, transientExceptionMessages)) {
+            throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e);
+          }
           meetException = true;
           writer.reset(dbName, tableName);
           addOrThrowException(e, tableString, dbName, tableName, getFailedWriterList(writer));
@@ -405,6 +412,9 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
         try {
           writer.flush(dbName, tableName);
         } catch (IOException e) {
+          if (isExceptionTransient(e, transientExceptionMessages)) {
+            throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e);
+          }
           meetException = true;
           writer.reset(dbName, tableName);
           addOrThrowException(e, tableString, dbName, tableName, getFailedWriterList(writer));
@@ -424,6 +434,14 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
     }
   }
 
+  /**
+   * Check if exception is contained within a known list of transient exceptions. These exceptions should not be caught
+   * to avoid advancing watermarks and skipping GMCEs unnecessarily.
+   */
+  public static boolean isExceptionTransient(Exception e, Set<String> transientExceptionMessages) {
+    return transientExceptionMessages.stream().anyMatch(message -> e.getMessage().contains(message));
+  }
+
   /**
    * Call the metadata writers to do flush each table metadata.
    * Flush of metadata writer is the place that do real metadata
@@ -526,7 +544,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
       gobblinTrackingEvent.addMetadata(MetadataWriterKeys.PARTITION_KEYS, Joiner.on(',').join(exception.partitionKeys.stream()
           .map(HiveRegistrationUnit.Column::getName).collect(Collectors.toList())));
 
-      String message = exception.getCause() == null ? exception.getMessage() : exception.getCause().getMessage();
+      String message = Throwables.getRootCause(exception).getMessage();
       gobblinTrackingEvent.addMetadata(MetadataWriterKeys.EXCEPTION_MESSAGE_KEY_NAME, message);
 
       eventSubmitter.submit(gobblinTrackingEvent);
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
index 32e8a34e1..e1c85f580 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiConsumer;
 import lombok.SneakyThrows;
@@ -56,6 +57,8 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Sets;
+
 import static org.mockito.Mockito.*;
 
 
@@ -212,6 +215,15 @@ public class GobblinMCEWriterTest extends PowerMockTestCase {
     Assert.assertEquals(allowedWriters.get(1).getClass().getName(), exceptionWriter.getClass().getName());
   }
 
+  @Test
+  public void testDetectTransientException() {
+    Set<String> transientExceptions = Sets.newHashSet("Filesystem closed", "Hive timeout");
+    IOException transientException = new IOException("test1 Filesystem closed test");
+    Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(transientException, transientExceptions));
+    IOException nonTransientException = new IOException("Write failed due to bad schema");
+    Assert.assertFalse(GobblinMCEWriter.isExceptionTransient(nonTransientException, transientExceptions));
+  }
+
   @DataProvider(name="AllowMockMetadataWriter")
   public Object[][] allowMockMetadataWriterParams() {
     initMocks();