You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/05/05 17:27:47 UTC

[gobblin] branch master updated: [GOBBLIN-1637] Add writer, operation, and partition info to failed metadata writer events (#3498)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ffcab02df [GOBBLIN-1637] Add writer, operation, and partition info to failed metadata writer events (#3498)
ffcab02df is described below

commit ffcab02df0fca192d25cbd8be14201d9c97f2c91
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu May 5 10:27:42 2022 -0700

    [GOBBLIN-1637] Add writer, operation, and partition info to failed metadata writer events (#3498)
    
    * Add writer, operation, and partition info to failed metadata writer events
    
    * Add partitionKeys to failure event
    
    * Add all failed writers to event
---
 .../gobblin/hive/writer/HiveMetadataWriter.java    |  30 +++++-
 ...veMetadataWriterWithPartitionInfoException.java |  27 ++---
 .../gobblin/iceberg/writer/GobblinMCEWriter.java   | 111 ++++++++++++++-------
 .../iceberg/writer/GobblinMetadataException.java   |  21 +++-
 .../iceberg/writer/IcebergMCEMetadataKeys.java     |   5 +
 .../iceberg/writer/IcebergMetadataWriterTest.java  |   4 +-
 6 files changed, 135 insertions(+), 63 deletions(-)

diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 5ed0d47cf..7a3595184 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -26,12 +26,16 @@ import com.google.common.io.Closer;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
@@ -114,8 +118,12 @@ public class HiveMetadataWriter implements MetadataWriter {
       for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : executionMap.entrySet()) {
         try {
           execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-          throw new RuntimeException("Error when getting the result of registration for table" + tableKey, e);
+        } catch (TimeoutException e) {
+          // Since TimeoutException should always be a transient issue, throw RuntimeException which will fail/retry container
+          throw new RuntimeException("Timeout waiting for result of registration for table " + tableKey, e);
+        } catch (InterruptedException | ExecutionException e) {
+          Set<String> partitions = executionMap.keySet().stream().flatMap(List::stream).collect(Collectors.toSet());
+          throw new HiveMetadataWriterWithPartitionInfoException(partitions, Collections.emptySet(), e);
         }
       }
       executionMap.clear();
@@ -243,7 +251,7 @@ public class HiveMetadataWriter implements MetadataWriter {
       try {
         executionMap.get(partitionValue).get(timeOutSeconds, TimeUnit.SECONDS);
       } catch (InterruptedException | ExecutionException | TimeoutException e) {
-        log.error("Error when getting the result of registration for table" + tableKey);
+        log.error("Error when getting the result of registration for table " + tableKey);
         throw new RuntimeException(e);
       }
     }
@@ -326,13 +334,27 @@ public class HiveMetadataWriter implements MetadataWriter {
     GobblinMetadataChangeEvent gmce =
         (GobblinMetadataChangeEvent) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
     if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
-      write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
+      try {
+        write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
+      } catch (IOException e) {
+        throw new HiveMetadataWriterWithPartitionInfoException(getPartitionValues(newSpecsMap), getPartitionValues(oldSpecsMap), e);
+      }
     } else {
       log.debug(String.format("Skip table %s.%s since it's not selected", tableSpec.getTable().getDbName(),
           tableSpec.getTable().getTableName()));
     }
   }
 
+  /**
+   * Extract a unique list of partition values as strings from a map of HiveSpecs.
+   */
+  public Set<String> getPartitionValues(Map<String, Collection<HiveSpec>> specMap) {
+    Set<HiveSpec> hiveSpecs = specMap.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+    Set<List<String>> partitionValueLists = hiveSpecs.stream().filter(spec -> spec.getPartition().isPresent())
+        .map(spec -> spec.getPartition().get().getValues()).collect(Collectors.toSet());
+    return partitionValueLists.stream().flatMap(List::stream).collect(Collectors.toSet());
+  }
+
   @Override
   public void close() throws IOException {
     this.closer.close();
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriterWithPartitionInfoException.java
similarity index 51%
copy from gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
copy to gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriterWithPartitionInfoException.java
index 90f1e6379..1b0b2c47d 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriterWithPartitionInfoException.java
@@ -15,26 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.iceberg.writer;
+package org.apache.gobblin.hive.writer;
 
 import java.io.IOException;
+import java.util.Set;
 
 
-public class GobblinMetadataException extends IOException {
-  public String datasetPath;
-  public String dbName;
-  public String tableName;
-  public String GMCETopicPartition;
-  public long highWatermark;
-  public long lowWatermark;
-  public Exception exception;
-  GobblinMetadataException(String datasetPath, String dbName, String tableName, String GMCETopicPartition, long lowWatermark, long highWatermark, Exception exception) {
-    super(String.format("failed to flush table %s, %s", dbName, tableName), exception);
-    this.datasetPath = datasetPath;
-    this.dbName = dbName;
-    this.tableName = tableName;
-    this.GMCETopicPartition = GMCETopicPartition;
-    this.highWatermark = highWatermark;
-    this.lowWatermark = lowWatermark;
+public class HiveMetadataWriterWithPartitionInfoException extends IOException {
+  public Set<String> addedPartitionValues;
+  public Set<String> droppedPartitionValues;
+
+  HiveMetadataWriterWithPartitionInfoException(Set<String> addedPartitionValues, Set<String> droppedPartitionValues, Exception exception) {
+    super(exception);
+    this.addedPartitionValues = addedPartitionValues;
+    this.droppedPartitionValues = droppedPartitionValues;
   }
 }
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index f9bb21c97..d0510977b 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -57,9 +57,11 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
 import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.hive.writer.HiveMetadataWriterWithPartitionInfoException;
 import org.apache.gobblin.hive.writer.MetadataWriter;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metadata.DataFile;
@@ -109,13 +111,14 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
   List<MetadataWriter> metadataWriters;
   Map<String, TableStatus> tableOperationTypeMap;
   @Getter
-  Map<String, Map<String, GobblinMetadataException>> datasetErrorMap;
+  Map<String, Map<String, List<GobblinMetadataException>>> datasetErrorMap;
   Set<String> acceptedClusters;
   protected State state;
   private final ParallelRunner parallelRunner;
   private int parallelRunnerTimeoutMills;
   private Map<String, Cache<String, Collection<HiveSpec>>> oldSpecsMaps;
   private Map<String, Cache<String, Collection<HiveSpec>>> newSpecsMaps;
+  private Map<String, List<HiveRegistrationUnit.Column>> partitionKeysMap;
   private Closer closer = Closer.create();
   protected final AtomicLong recordCount = new AtomicLong(0L);
   @Setter
@@ -136,6 +139,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
     oldSpecsMaps = new HashMap<>();
     metadataWriters = new ArrayList<>();
     datasetErrorMap = new HashMap<>();
+    partitionKeysMap = new HashMap<>();
     acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES, ClustersNames.getInstance().getClusterName());
     state = properties;
     maxErrorDataset = state.getPropAsInt(GMCE_METADATA_WRITER_MAX_ERROR_DATASET, DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET);
@@ -281,6 +285,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
+      partitionKeysMap.put(tableString, spec.getTable().getPartitionKeys());
       if (!tableOperationTypeMap.containsKey(tableString)) {
         tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
             gmce.getDatasetIdentifier().getNativeName(), watermark.getSource(),
@@ -331,7 +336,7 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
         } catch (Exception e) {
           meetException = true;
           writer.reset(dbName, tableName);
-          addOrThrowException(e, tableString, dbName, tableName);
+          addOrThrowException(e, tableString, dbName, tableName, getFailedWriterList(writer));
         }
       }
     }
@@ -355,15 +360,26 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
         .collect(Collectors.toList());
   }
 
-  private void addOrThrowException(Exception e, String tableString, String dbName, String tableName) throws IOException{
+  private void addOrThrowException(Exception e, String tableString, String dbName, String tableName, List<String> failedWriters) throws IOException {
     TableStatus tableStatus = tableOperationTypeMap.get(tableString);
-    Map<String, GobblinMetadataException> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetPath, new HashMap<>());
-    if (tableErrorMap.containsKey(tableString)) {
-      tableErrorMap.get(tableString).highWatermark = tableStatus.gmceHighWatermark;
+    Map<String, List<GobblinMetadataException>> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetPath, new HashMap<>());
+    GobblinMetadataException lastException = null;
+    if (tableErrorMap.containsKey(tableString) && !tableErrorMap.get(tableString).isEmpty()) {
+      lastException = tableErrorMap.get(tableString).get(tableErrorMap.get(tableString).size() - 1);
     } else {
-      GobblinMetadataException gobblinMetadataException =
-          new GobblinMetadataException(tableStatus.datasetPath, dbName, tableName, tableStatus.gmceTopicPartition, tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
-      tableErrorMap.put(tableString, gobblinMetadataException);
+      tableErrorMap.put(tableString, new ArrayList<>());
+    }
+    // If operationType has changed, add a new exception to the list so that each failure event represents an offset range all containing the same operation
+    if (lastException != null && lastException.operationType.equals(tableStatus.operationType)) {
+      lastException.highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      lastException = new GobblinMetadataException(tableStatus.datasetPath, dbName, tableName, tableStatus.gmceTopicPartition,
+          tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, failedWriters, tableStatus.operationType, partitionKeysMap.get(tableString), e);
+      tableErrorMap.get(tableString).add(lastException);
+    }
+    if (e instanceof HiveMetadataWriterWithPartitionInfoException) {
+      lastException.addedPartitionValues.addAll(((HiveMetadataWriterWithPartitionInfoException) e).addedPartitionValues);
+      lastException.droppedPartitionValues.addAll(((HiveMetadataWriterWithPartitionInfoException) e).droppedPartitionValues);
     }
     this.datasetErrorMap.put(tableStatus.datasetPath, tableErrorMap);
     tableOperationTypeMap.remove(tableString);
@@ -391,18 +407,20 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
         } catch (IOException e) {
           meetException = true;
           writer.reset(dbName, tableName);
-          addOrThrowException(e, tableString, dbName, tableName);
+          addOrThrowException(e, tableString, dbName, tableName, getFailedWriterList(writer));
         }
       }
     }
-    String datasetPath = tableOperationTypeMap.get(tableString).datasetPath;
-    if (!meetException && datasetErrorMap.containsKey(datasetPath) && datasetErrorMap.get(datasetPath).containsKey(tableString)) {
-      // We only want to emit GTE when the table watermark moves. There can be two scenario that watermark move, one is after one flush interval,
-      // we commit new watermark to state store, anther is here, where during the flush interval, we flush table because table operation changes.
-      // Under this condition, error map contains this dataset means we met error before this flush, but this time when flush succeed and
-      // the watermark inside the table moves, so we want to emit GTE to indicate there is some data loss here
-      submitFailureEvent(datasetErrorMap.get(datasetPath).get(tableString));
-      this.datasetErrorMap.get(datasetPath).remove(tableString);
+    if (!meetException) {
+      String datasetPath = tableOperationTypeMap.get(tableString).datasetPath;
+      if (datasetErrorMap.containsKey(datasetPath) && datasetErrorMap.get(datasetPath).containsKey(tableString)) {
+        // We only want to emit GTE when the table watermark moves. There can be two scenario that watermark move, one is after one flush interval,
+        // we commit new watermark to state store, anther is here, where during the flush interval, we flush table because table operation changes.
+        // Under this condition, error map contains this dataset means we met error before this flush, but this time when flush succeed and
+        // the watermark inside the table moves, so we want to emit GTE to indicate there is some data loss here
+        submitFailureEvents(datasetErrorMap.get(datasetPath).get(tableString));
+        this.datasetErrorMap.get(datasetPath).remove(tableString);
+      }
     }
   }
 
@@ -428,9 +446,9 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
     tableOperationTypeMap.clear();
     recordCount.lazySet(0L);
     // Emit events for all current errors, since the GMCE watermark will be advanced
-    for (Map.Entry<String, Map<String, GobblinMetadataException>> entry : datasetErrorMap.entrySet()) {
-      for (GobblinMetadataException exception : entry.getValue().values()) {
-        submitFailureEvent(exception);
+    for (Map.Entry<String, Map<String, List<GobblinMetadataException>>> entry : datasetErrorMap.entrySet()) {
+      for (List<GobblinMetadataException> exceptionList : entry.getValue().values()) {
+        submitFailureEvents(exceptionList);
       }
       entry.getValue().clear();
     }
@@ -483,23 +501,40 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
   }
 
   /**
-   * Submit event indicating that a specific set of GMCEs have been skipped, so there is a gap in the registration
+   * Submit events indicating that a specific set of GMCEs have been skipped, so there is a gap in the registration
    */
-  private void submitFailureEvent(GobblinMetadataException exception) {
-    log.warn(String.format("Sending GTE to indicate table flush failure for %s.%s", exception.dbName, exception.tableName));
-
-    GobblinEventBuilder gobblinTrackingEvent = new GobblinEventBuilder(IcebergMCEMetadataKeys.METADATA_WRITER_FAILURE_EVENT);
-
-    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATASET_HDFS_PATH, exception.datasetPath);
-    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_DB_NAME, exception.dbName);
-    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME, exception.tableName);
-    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_NAME, exception.GMCETopicPartition.split("-")[0]);
-    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, exception.GMCETopicPartition.split("-")[1]);
-    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK, Long.toString(exception.highWatermark));
-    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK, Long.toString(exception.lowWatermark));
-    String message = exception.getCause() == null ? exception.getMessage() : exception.getCause().getMessage();
-    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.EXCEPTION_MESSAGE_KEY_NAME, message);
-
-    eventSubmitter.submit(gobblinTrackingEvent);
+  private void submitFailureEvents(List<GobblinMetadataException> exceptionList) {
+    if (exceptionList.isEmpty()) {
+      return;
+    }
+    log.warn(String.format("Sending GTEs to indicate table flush failure for %s.%s", exceptionList.get(0).dbName, exceptionList.get(0).tableName));
+
+    for (GobblinMetadataException exception : exceptionList) {
+      GobblinEventBuilder gobblinTrackingEvent = new GobblinEventBuilder(IcebergMCEMetadataKeys.METADATA_WRITER_FAILURE_EVENT);
+
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATASET_HDFS_PATH, exception.datasetPath);
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_DB_NAME, exception.dbName);
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME, exception.tableName);
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_NAME, exception.GMCETopicPartition.split("-")[0]);
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, exception.GMCETopicPartition.split("-")[1]);
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK, Long.toString(exception.highWatermark));
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK, Long.toString(exception.lowWatermark));
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILED_WRITERS_KEY, Joiner.on(',').join(exception.failedWriters));
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.OPERATION_TYPE_KEY, exception.operationType.toString());
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.ADDED_PARTITION_VALUES_KEY, Joiner.on(',').join(exception.addedPartitionValues));
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DROPPED_PARTITION_VALUES_KEY, Joiner.on(',').join(exception.droppedPartitionValues));
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.PARTITION_KEYS, Joiner.on(',').join(exception.partitionKeys.stream()
+          .map(HiveRegistrationUnit.Column::getName).collect(Collectors.toList())));
+
+      String message = exception.getCause() == null ? exception.getMessage() : exception.getCause().getMessage();
+      gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.EXCEPTION_MESSAGE_KEY_NAME, message);
+
+      eventSubmitter.submit(gobblinTrackingEvent);
+    }
+  }
+
+  private List<String> getFailedWriterList(MetadataWriter failedWriter) {
+    List<MetadataWriter> failedWriters = metadataWriters.subList(metadataWriters.indexOf(failedWriter), metadataWriters.size());
+    return failedWriters.stream().map(writer -> writer.getClass().getName()).collect(Collectors.toList());
   }
 }
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
index 90f1e6379..98f10e4ce 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMetadataException.java
@@ -18,6 +18,12 @@
 package org.apache.gobblin.iceberg.writer;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.gobblin.hive.HiveRegistrationUnit;
+import org.apache.gobblin.metadata.OperationType;
 
 
 public class GobblinMetadataException extends IOException {
@@ -27,8 +33,14 @@ public class GobblinMetadataException extends IOException {
   public String GMCETopicPartition;
   public long highWatermark;
   public long lowWatermark;
-  public Exception exception;
-  GobblinMetadataException(String datasetPath, String dbName, String tableName, String GMCETopicPartition, long lowWatermark, long highWatermark, Exception exception) {
+  public List<String> failedWriters;
+  public OperationType operationType;
+  public Set<String> addedPartitionValues;
+  public Set<String> droppedPartitionValues;
+  public List<HiveRegistrationUnit.Column> partitionKeys;
+
+  GobblinMetadataException(String datasetPath, String dbName, String tableName, String GMCETopicPartition, long lowWatermark, long highWatermark,
+      List<String> failedWriters, OperationType operationType, List<HiveRegistrationUnit.Column> partitionKeys, Exception exception) {
     super(String.format("failed to flush table %s, %s", dbName, tableName), exception);
     this.datasetPath = datasetPath;
     this.dbName = dbName;
@@ -36,5 +48,10 @@ public class GobblinMetadataException extends IOException {
     this.GMCETopicPartition = GMCETopicPartition;
     this.highWatermark = highWatermark;
     this.lowWatermark = lowWatermark;
+    this.failedWriters = failedWriters;
+    this.operationType = operationType;
+    this.addedPartitionValues = new HashSet<>();
+    this.droppedPartitionValues = new HashSet<>();
+    this.partitionKeys = partitionKeys;
   }
 }
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
index f925b3689..631bfcaa9 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
@@ -36,6 +36,11 @@ public class IcebergMCEMetadataKeys {
   public static final String FAILURE_EVENT_TABLE_NAME = "tableName";
   public static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
   public static final String EXCEPTION_MESSAGE_KEY_NAME = "exceptionMessage";
+  public static final String FAILED_WRITERS_KEY = "failedWriters";
+  public static final String OPERATION_TYPE_KEY = "operationType";
+  public static final String ADDED_PARTITION_VALUES_KEY = "failedToAddPartitionValues";
+  public static final String DROPPED_PARTITION_VALUES_KEY = "failedToDropPartitionValues";
+  public static final String PARTITION_KEYS = "partitionKeys";
 
   private IcebergMCEMetadataKeys() {
   }
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 288d689ea..43fd821b4 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -365,10 +365,10 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(), 1);
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
         .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
-        .get("hivedb.testIcebergTable").lowWatermark, 50L);
+        .get("hivedb.testIcebergTable").get(0).lowWatermark, 50L);
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
         .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
-        .get("hivedb.testIcebergTable").highWatermark, 52L);
+        .get("hivedb.testIcebergTable").get(0).highWatermark, 52L);
 
     // No events sent yet since the topic has not been flushed
     Assert.assertEquals(eventsSent.size(), 0);