You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/05/05 17:27:47 UTC
[gobblin] branch master updated: [GOBBLIN-1637] Add writer, operation, and partition info to failed metadata writer events (#3498)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ffcab02df [GOBBLIN-1637] Add writer, operation, and partition info to failed metadata writer events (#3498)
ffcab02df is described below
commit ffcab02df0fca192d25cbd8be14201d9c97f2c91
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu May 5 10:27:42 2022 -0700
[GOBBLIN-1637] Add writer, operation, and partition info to failed metadata writer events (#3498)
* Add writer, operation, and partition info to failed metadata writer events
* Add partitionKeys to failure event
* Add all failed writers to event
---
.../gobblin/hive/writer/HiveMetadataWriter.java | 30 +++++-
...veMetadataWriterWithPartitionInfoException.java | 27 ++---
.../gobblin/iceberg/writer/GobblinMCEWriter.java | 111 ++++++++++++++-------
.../iceberg/writer/GobblinMetadataException.java | 21 +++-
.../iceberg/writer/IcebergMCEMetadataKeys.java | 5 +
.../iceberg/writer/IcebergMetadataWriterTest.java | 4 +-
6 files changed, 135 insertions(+), 63 deletions(-)
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 5ed0d47cf..7a3595184 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -26,12 +26,16 @@ import com.google.common.io.Closer;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
@@ -114,8 +118,12 @@ public class HiveMetadataWriter implements MetadataWriter {
for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : executionMap.entrySet()) {
try {
execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- throw new RuntimeException("Error when getting the result of registration for table" + tableKey, e);
+ } catch (TimeoutException e) {
+ // Since TimeoutException should always be a transient issue, throw RuntimeException which will fail/retry container
+ throw new RuntimeException("Timeout waiting for result of registration for table " + tableKey, e);
+ } catch (InterruptedException | ExecutionException e) {
+ Set<String> partitions = executionMap.keySet().stream().flatMap(List::stream).collect(Collectors.toSet());
+ throw new HiveMetadataWriterWithPartitionInfoException(partitions, Collections.emptySet(), e);
}
}
executionMap.clear();
@@ -243,7 +251,7 @@ public class HiveMetadataWriter implements MetadataWriter {
try {
executionMap.get(partitionValue).get(timeOutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.error("Error when getting the result of registration for table" + tableKey);
+ log.error("Error when getting the result of registration for table " + tableKey);
throw new RuntimeException(e);
}
}
@@ -326,13 +334,27 @@ public class HiveMetadataWriter implements MetadataWriter {
GobblinMetadataChangeEvent gmce =
(GobblinMetadataChangeEvent) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
- write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
+ try {
+ write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
+ } catch (IOException e) {
+ throw new HiveMetadataWriterWithPartitionInfoException(getPartitionValues(newSpecsMap), getPartitionValues(oldSpecsMap), e);
+ }
} else {
log.debug(String.format("Skip table %s.%s since it's not selected", tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName()));
}
}
+ /**
+ * Extract a unique list of partition values as strings from a map of HiveSpecs.
+ */
+ public Set<String> getPartitionValues(Map<String, Collection<HiveSpec>> specMap) {
+ Set<HiveSpec> hiveSpecs = specMap.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+ Set<List<String>> partitionValueLists = hiveSpecs.stream().filter(spec -> spec.getPartition().isPresent())
+ .map(spec -> spec.getPartition().get().getValues()).collect(Collectors.toSet());
+ return partitionValueLists.stream().flatMap(List::stream).collect(Collectors.toSet());
+ }
+
@Override
public void close() throws IOException {
this.closer.close();
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriterWithPartitionInfoException.java
similarity index 51%
copy from gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
copy to gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriterWithPartitionInfoException.java
index 90f1e6379..1b0b2c47d 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriterWithPartitionInfoException.java
@@ -15,26 +15,19 @@
* limitations under the License.
*/
-package org.apache.gobblin.iceberg.writer;
+package org.apache.gobblin.hive.writer;
import java.io.IOException;
+import java.util.Set;
-public class GobblinMetadataException extends IOException {
- public String datasetPath;
- public String dbName;
- public String tableName;
- public String GMCETopicPartition;
- public long highWatermark;
- public long lowWatermark;
- public Exception exception;
- GobblinMetadataException(String datasetPath, String dbName, String tableName, String GMCETopicPartition, long lowWatermark, long highWatermark, Exception exception) {
- super(String.format("failed to flush table %s, %s", dbName, tableName), exception);
- this.datasetPath = datasetPath;
- this.dbName = dbName;
- this.tableName = tableName;
- this.GMCETopicPartition = GMCETopicPartition;
- this.highWatermark = highWatermark;
- this.lowWatermark = lowWatermark;
+public class HiveMetadataWriterWithPartitionInfoException extends IOException {
+ public Set<String> addedPartitionValues;
+ public Set<String> droppedPartitionValues;
+
+ HiveMetadataWriterWithPartitionInfoException(Set<String> addedPartitionValues, Set<String> droppedPartitionValues, Exception exception) {
+ super(exception);
+ this.addedPartitionValues = addedPartitionValues;
+ this.droppedPartitionValues = droppedPartitionValues;
}
}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index f9bb21c97..d0510977b 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -57,9 +57,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.hive.writer.HiveMetadataWriterWithPartitionInfoException;
import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metadata.DataFile;
@@ -109,13 +111,14 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
List<MetadataWriter> metadataWriters;
Map<String, TableStatus> tableOperationTypeMap;
@Getter
- Map<String, Map<String, GobblinMetadataException>> datasetErrorMap;
+ Map<String, Map<String, List<GobblinMetadataException>>> datasetErrorMap;
Set<String> acceptedClusters;
protected State state;
private final ParallelRunner parallelRunner;
private int parallelRunnerTimeoutMills;
private Map<String, Cache<String, Collection<HiveSpec>>> oldSpecsMaps;
private Map<String, Cache<String, Collection<HiveSpec>>> newSpecsMaps;
+ private Map<String, List<HiveRegistrationUnit.Column>> partitionKeysMap;
private Closer closer = Closer.create();
protected final AtomicLong recordCount = new AtomicLong(0L);
@Setter
@@ -136,6 +139,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
oldSpecsMaps = new HashMap<>();
metadataWriters = new ArrayList<>();
datasetErrorMap = new HashMap<>();
+ partitionKeysMap = new HashMap<>();
acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES, ClustersNames.getInstance().getClusterName());
state = properties;
maxErrorDataset = state.getPropAsInt(GMCE_METADATA_WRITER_MAX_ERROR_DATASET, DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET);
@@ -281,6 +285,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
String dbName = spec.getTable().getDbName();
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
+ partitionKeysMap.put(tableString, spec.getTable().getPartitionKeys());
if (!tableOperationTypeMap.containsKey(tableString)) {
tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
gmce.getDatasetIdentifier().getNativeName(), watermark.getSource(),
@@ -331,7 +336,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
} catch (Exception e) {
meetException = true;
writer.reset(dbName, tableName);
- addOrThrowException(e, tableString, dbName, tableName);
+ addOrThrowException(e, tableString, dbName, tableName, getFailedWriterList(writer));
}
}
}
@@ -355,15 +360,26 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
.collect(Collectors.toList());
}
- private void addOrThrowException(Exception e, String tableString, String dbName, String tableName) throws IOException{
+ private void addOrThrowException(Exception e, String tableString, String dbName, String tableName, List<String> failedWriters) throws IOException {
TableStatus tableStatus = tableOperationTypeMap.get(tableString);
- Map<String, GobblinMetadataException> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetPath, new HashMap<>());
- if (tableErrorMap.containsKey(tableString)) {
- tableErrorMap.get(tableString).highWatermark = tableStatus.gmceHighWatermark;
+ Map<String, List<GobblinMetadataException>> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetPath, new HashMap<>());
+ GobblinMetadataException lastException = null;
+ if (tableErrorMap.containsKey(tableString) && !tableErrorMap.get(tableString).isEmpty()) {
+ lastException = tableErrorMap.get(tableString).get(tableErrorMap.get(tableString).size() - 1);
} else {
- GobblinMetadataException gobblinMetadataException =
- new GobblinMetadataException(tableStatus.datasetPath, dbName, tableName, tableStatus.gmceTopicPartition, tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
- tableErrorMap.put(tableString, gobblinMetadataException);
+ tableErrorMap.put(tableString, new ArrayList<>());
+ }
+ // If operationType has changed, add a new exception to the list so that each failure event represents an offset range all containing the same operation
+ if (lastException != null && lastException.operationType.equals(tableStatus.operationType)) {
+ lastException.highWatermark = tableStatus.gmceHighWatermark;
+ } else {
+ lastException = new GobblinMetadataException(tableStatus.datasetPath, dbName, tableName, tableStatus.gmceTopicPartition,
+ tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, failedWriters, tableStatus.operationType, partitionKeysMap.get(tableString), e);
+ tableErrorMap.get(tableString).add(lastException);
+ }
+ if (e instanceof HiveMetadataWriterWithPartitionInfoException) {
+ lastException.addedPartitionValues.addAll(((HiveMetadataWriterWithPartitionInfoException) e).addedPartitionValues);
+ lastException.droppedPartitionValues.addAll(((HiveMetadataWriterWithPartitionInfoException) e).droppedPartitionValues);
}
this.datasetErrorMap.put(tableStatus.datasetPath, tableErrorMap);
tableOperationTypeMap.remove(tableString);
@@ -391,18 +407,20 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
} catch (IOException e) {
meetException = true;
writer.reset(dbName, tableName);
- addOrThrowException(e, tableString, dbName, tableName);
+ addOrThrowException(e, tableString, dbName, tableName, getFailedWriterList(writer));
}
}
}
- String datasetPath = tableOperationTypeMap.get(tableString).datasetPath;
- if (!meetException && datasetErrorMap.containsKey(datasetPath) && datasetErrorMap.get(datasetPath).containsKey(tableString)) {
- // We only want to emit GTE when the table watermark moves. There can be two scenario that watermark move, one is after one flush interval,
- // we commit new watermark to state store, anther is here, where during the flush interval, we flush table because table operation changes.
- // Under this condition, error map contains this dataset means we met error before this flush, but this time when flush succeed and
- // the watermark inside the table moves, so we want to emit GTE to indicate there is some data loss here
- submitFailureEvent(datasetErrorMap.get(datasetPath).get(tableString));
- this.datasetErrorMap.get(datasetPath).remove(tableString);
+ if (!meetException) {
+ String datasetPath = tableOperationTypeMap.get(tableString).datasetPath;
+ if (datasetErrorMap.containsKey(datasetPath) && datasetErrorMap.get(datasetPath).containsKey(tableString)) {
+ // We only want to emit GTE when the table watermark moves. There can be two scenario that watermark move, one is after one flush interval,
+ // we commit new watermark to state store, anther is here, where during the flush interval, we flush table because table operation changes.
+ // Under this condition, error map contains this dataset means we met error before this flush, but this time when flush succeed and
+ // the watermark inside the table moves, so we want to emit GTE to indicate there is some data loss here
+ submitFailureEvents(datasetErrorMap.get(datasetPath).get(tableString));
+ this.datasetErrorMap.get(datasetPath).remove(tableString);
+ }
}
}
@@ -428,9 +446,9 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
tableOperationTypeMap.clear();
recordCount.lazySet(0L);
// Emit events for all current errors, since the GMCE watermark will be advanced
- for (Map.Entry<String, Map<String, GobblinMetadataException>> entry : datasetErrorMap.entrySet()) {
- for (GobblinMetadataException exception : entry.getValue().values()) {
- submitFailureEvent(exception);
+ for (Map.Entry<String, Map<String, List<GobblinMetadataException>>> entry : datasetErrorMap.entrySet()) {
+ for (List<GobblinMetadataException> exceptionList : entry.getValue().values()) {
+ submitFailureEvents(exceptionList);
}
entry.getValue().clear();
}
@@ -483,23 +501,40 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
}
/**
- * Submit event indicating that a specific set of GMCEs have been skipped, so there is a gap in the registration
+ * Submit events indicating that a specific set of GMCEs have been skipped, so there is a gap in the registration
*/
- private void submitFailureEvent(GobblinMetadataException exception) {
- log.warn(String.format("Sending GTE to indicate table flush failure for %s.%s", exception.dbName, exception.tableName));
-
- GobblinEventBuilder gobblinTrackingEvent = new GobblinEventBuilder(IcebergMCEMetadataKeys.METADATA_WRITER_FAILURE_EVENT);
-
- gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATASET_HDFS_PATH, exception.datasetPath);
- gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_DB_NAME, exception.dbName);
- gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME, exception.tableName);
- gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_NAME, exception.GMCETopicPartition.split("-")[0]);
- gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, exception.GMCETopicPartition.split("-")[1]);
- gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK, Long.toString(exception.highWatermark));
- gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK, Long.toString(exception.lowWatermark));
- String message = exception.getCause() == null ? exception.getMessage() : exception.getCause().getMessage();
- gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.EXCEPTION_MESSAGE_KEY_NAME, message);
-
- eventSubmitter.submit(gobblinTrackingEvent);
+ private void submitFailureEvents(List<GobblinMetadataException> exceptionList) {
+ if (exceptionList.isEmpty()) {
+ return;
+ }
+ log.warn(String.format("Sending GTEs to indicate table flush failure for %s.%s", exceptionList.get(0).dbName, exceptionList.get(0).tableName));
+
+ for (GobblinMetadataException exception : exceptionList) {
+ GobblinEventBuilder gobblinTrackingEvent = new GobblinEventBuilder(IcebergMCEMetadataKeys.METADATA_WRITER_FAILURE_EVENT);
+
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATASET_HDFS_PATH, exception.datasetPath);
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_DB_NAME, exception.dbName);
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME, exception.tableName);
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_NAME, exception.GMCETopicPartition.split("-")[0]);
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, exception.GMCETopicPartition.split("-")[1]);
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK, Long.toString(exception.highWatermark));
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK, Long.toString(exception.lowWatermark));
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILED_WRITERS_KEY, Joiner.on(',').join(exception.failedWriters));
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.OPERATION_TYPE_KEY, exception.operationType.toString());
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.ADDED_PARTITION_VALUES_KEY, Joiner.on(',').join(exception.addedPartitionValues));
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DROPPED_PARTITION_VALUES_KEY, Joiner.on(',').join(exception.droppedPartitionValues));
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.PARTITION_KEYS, Joiner.on(',').join(exception.partitionKeys.stream()
+ .map(HiveRegistrationUnit.Column::getName).collect(Collectors.toList())));
+
+ String message = exception.getCause() == null ? exception.getMessage() : exception.getCause().getMessage();
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.EXCEPTION_MESSAGE_KEY_NAME, message);
+
+ eventSubmitter.submit(gobblinTrackingEvent);
+ }
+ }
+
+ private List<String> getFailedWriterList(MetadataWriter failedWriter) {
+ List<MetadataWriter> failedWriters = metadataWriters.subList(metadataWriters.indexOf(failedWriter), metadataWriters.size());
+ return failedWriters.stream().map(writer -> writer.getClass().getName()).collect(Collectors.toList());
}
}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
index 90f1e6379..98f10e4ce 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
@@ -18,6 +18,12 @@
package org.apache.gobblin.iceberg.writer;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.hive.HiveRegistrationUnit;
+import org.apache.gobblin.metadata.OperationType;
public class GobblinMetadataException extends IOException {
@@ -27,8 +33,14 @@ public class GobblinMetadataException extends IOException {
public String GMCETopicPartition;
public long highWatermark;
public long lowWatermark;
- public Exception exception;
- GobblinMetadataException(String datasetPath, String dbName, String tableName, String GMCETopicPartition, long lowWatermark, long highWatermark, Exception exception) {
+ public List<String> failedWriters;
+ public OperationType operationType;
+ public Set<String> addedPartitionValues;
+ public Set<String> droppedPartitionValues;
+ public List<HiveRegistrationUnit.Column> partitionKeys;
+
+ GobblinMetadataException(String datasetPath, String dbName, String tableName, String GMCETopicPartition, long lowWatermark, long highWatermark,
+ List<String> failedWriters, OperationType operationType, List<HiveRegistrationUnit.Column> partitionKeys, Exception exception) {
super(String.format("failed to flush table %s, %s", dbName, tableName), exception);
this.datasetPath = datasetPath;
this.dbName = dbName;
@@ -36,5 +48,10 @@ public class GobblinMetadataException extends IOException {
this.GMCETopicPartition = GMCETopicPartition;
this.highWatermark = highWatermark;
this.lowWatermark = lowWatermark;
+ this.failedWriters = failedWriters;
+ this.operationType = operationType;
+ this.addedPartitionValues = new HashSet<>();
+ this.droppedPartitionValues = new HashSet<>();
+ this.partitionKeys = partitionKeys;
}
}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
index f925b3689..631bfcaa9 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
@@ -36,6 +36,11 @@ public class IcebergMCEMetadataKeys {
public static final String FAILURE_EVENT_TABLE_NAME = "tableName";
public static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
public static final String EXCEPTION_MESSAGE_KEY_NAME = "exceptionMessage";
+ public static final String FAILED_WRITERS_KEY = "failedWriters";
+ public static final String OPERATION_TYPE_KEY = "operationType";
+ public static final String ADDED_PARTITION_VALUES_KEY = "failedToAddPartitionValues";
+ public static final String DROPPED_PARTITION_VALUES_KEY = "failedToDropPartitionValues";
+ public static final String PARTITION_KEYS = "partitionKeys";
private IcebergMCEMetadataKeys() {
}
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 288d689ea..43fd821b4 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -365,10 +365,10 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(), 1);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
.get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
- .get("hivedb.testIcebergTable").lowWatermark, 50L);
+ .get("hivedb.testIcebergTable").get(0).lowWatermark, 50L);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
.get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
- .get("hivedb.testIcebergTable").highWatermark, 52L);
+ .get("hivedb.testIcebergTable").get(0).highWatermark, 52L);
// No events sent yet since the topic has not been flushed
Assert.assertEquals(eventsSent.size(), 0);