You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/13 22:47:22 UTC
[gobblin] branch master updated: Fail GMIP container for known transient exceptions to avoid data loss (#3576)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5e221ed9c Fail GMIP container for known transient exceptions to avoid data loss (#3576)
5e221ed9c is described below
commit 5e221ed9c9290008e61eb87d13c3e93f606b2606
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Oct 13 15:47:15 2022 -0700
Fail GMIP container for known transient exceptions to avoid data loss (#3576)
---
.../gobblin/iceberg/writer/GobblinMCEWriter.java | 20 +++++++++++++++++++-
.../gobblin/iceberg/writer/GobblinMCEWriterTest.java | 12 ++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 04ef762bd..6773b7cad 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -43,6 +43,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
@@ -105,6 +106,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
public static final String HIVE_PARTITION_NAME = "hive.partition.name";
public static final String GMCE_METADATA_WRITER_CLASSES = "gmce.metadata.writer.classes";
public static final String GMCE_METADATA_WRITER_MAX_ERROR_DATASET = "gmce.metadata.writer.max.error.dataset";
+ public static final String TRANSIENT_EXCEPTION_MESSAGES_KEY = "gmce.metadata.writer.transient.exception.messages";
public static final int DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 0;
public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000;
public static final String TABLE_NAME_DELIMITER = ".";
@@ -125,6 +127,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
@Setter
private int maxErrorDataset;
protected EventSubmitter eventSubmitter;
+ private final Set<String> transientExceptionMessages;
@AllArgsConstructor
static class TableStatus {
@@ -157,6 +160,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags);
eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
+ transientExceptionMessages = new HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
}
@Override
@@ -335,6 +339,9 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
try {
writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
} catch (Exception e) {
+ if (isExceptionTransient(e, transientExceptionMessages)) {
+ throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e);
+ }
meetException = true;
writer.reset(dbName, tableName);
addOrThrowException(e, tableString, dbName, tableName, getFailedWriterList(writer));
@@ -405,6 +412,9 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
try {
writer.flush(dbName, tableName);
} catch (IOException e) {
+ if (isExceptionTransient(e, transientExceptionMessages)) {
+ throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e);
+ }
meetException = true;
writer.reset(dbName, tableName);
addOrThrowException(e, tableString, dbName, tableName, getFailedWriterList(writer));
@@ -424,6 +434,14 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
}
}
+ /**
+ * Check if exception is contained within a known list of transient exceptions. These exceptions should not be caught
+ * to avoid advancing watermarks and skipping GMCEs unnecessarily.
+ */
+ public static boolean isExceptionTransient(Exception e, Set<String> transientExceptionMessages) {
+ return transientExceptionMessages.stream().anyMatch(message -> e.getMessage().contains(message));
+ }
+
/**
* Call the metadata writers to do flush each table metadata.
* Flush of metadata writer is the place that do real metadata
@@ -526,7 +544,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.PARTITION_KEYS, Joiner.on(',').join(exception.partitionKeys.stream()
.map(HiveRegistrationUnit.Column::getName).collect(Collectors.toList())));
- String message = exception.getCause() == null ? exception.getMessage() : exception.getCause().getMessage();
+ String message = Throwables.getRootCause(exception).getMessage();
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.EXCEPTION_MESSAGE_KEY_NAME, message);
eventSubmitter.submit(gobblinTrackingEvent);
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
index 32e8a34e1..e1c85f580 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import lombok.SneakyThrows;
@@ -56,6 +57,8 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
+import com.google.common.collect.Sets;
+
import static org.mockito.Mockito.*;
@@ -212,6 +215,15 @@ public class GobblinMCEWriterTest extends PowerMockTestCase {
Assert.assertEquals(allowedWriters.get(1).getClass().getName(), exceptionWriter.getClass().getName());
}
+ @Test
+ public void testDetectTransientException() {
+ Set<String> transientExceptions = Sets.newHashSet("Filesystem closed", "Hive timeout");
+ IOException transientException = new IOException("test1 Filesystem closed test");
+ Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(transientException, transientExceptions));
+ IOException nonTransientException = new IOException("Write failed due to bad schema");
+ Assert.assertFalse(GobblinMCEWriter.isExceptionTransient(nonTransientException, transientExceptions));
+ }
+
@DataProvider(name="AllowMockMetadataWriter")
public Object[][] allowMockMetadataWriterParams() {
initMocks();