You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/03/29 21:16:40 UTC

[gobblin] branch master updated: Add/fix some fields of MetadataWriterFailureEvent (#3485)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f22465  Add/fix some fields of MetadataWriterFailureEvent (#3485)
3f22465 is described below

commit 3f2246575e63538bccf24c83756bd45dabf82116
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Mar 29 14:16:32 2022 -0700

    Add/fix some fields of MetadataWriterFailureEvent (#3485)
---
 .../org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java   | 11 +++++++++--
 .../apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java |  2 ++
 .../apache/gobblin/iceberg/writer/IcebergMetadataWriter.java  |  3 +--
 3 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 86d0b5b..4dcefe7 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -62,6 +62,7 @@ import org.apache.gobblin.metadata.DataFile;
 import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
 import org.apache.gobblin.metadata.OperationType;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.source.extractor.CheckpointableWatermark;
@@ -142,7 +143,10 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
         FileSystem.get(HadoopUtils.getConfFromState(properties))));
     parallelRunnerTimeoutMills =
         state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS, DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS);
-    MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass());
+    List<Tag<?>> tags = Lists.newArrayList();
+    String clusterIdentifier = ClustersNames.getInstance().getClusterName();
+    tags.add(new Tag<>(IcebergMCEMetadataKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
+    MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags);
     eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
   }
 
@@ -446,9 +450,12 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
     gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATASET_HDFS_PATH, exception.datasetPath);
     gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_DB_NAME, exception.dbName);
     gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME, exception.tableName);
-    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, exception.GMCETopicPartition);
+    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_NAME, exception.GMCETopicPartition.split("-")[0]);
+    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, exception.GMCETopicPartition.split("-")[1]);
     gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK, Long.toString(exception.highWatermark));
     gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK, Long.toString(exception.lowWatermark));
+    String message = exception.getCause() == null ? exception.getMessage() : exception.getCause().getMessage();
+    gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.EXCEPTION_MESSAGE_KEY_NAME, message);
 
     eventSubmitter.submit(gobblinTrackingEvent);
   }
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
index a4eb3b1..f925b36 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
@@ -34,6 +34,8 @@ public class IcebergMCEMetadataKeys {
   public static final String DATASET_HDFS_PATH = "datasetHdfsPath";
   public static final String FAILURE_EVENT_DB_NAME = "databaseName";
   public static final String FAILURE_EVENT_TABLE_NAME = "tableName";
+  public static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
+  public static final String EXCEPTION_MESSAGE_KEY_NAME = "exceptionMessage";
 
   private IcebergMCEMetadataKeys() {
   }
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index df5739e..54e4406 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -144,7 +144,6 @@ public class IcebergMetadataWriter implements MetadataWriter {
   private static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
   private static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
   private static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
-  private static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
   private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
   private static final String SCHEMA_CREATION_TIME_KEY = "schema.creation.time";
   private static final String ADDED_FILES_CACHE_EXPIRING_TIME = "added.files.cache.expiring.time";
@@ -200,7 +199,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
     tableCurrentWatermarkMap = new HashMap<>();
     List<Tag<?>> tags = Lists.newArrayList();
     String clusterIdentifier = ClustersNames.getInstance().getClusterName();
-    tags.add(new Tag<>(CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
+    tags.add(new Tag<>(IcebergMCEMetadataKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
     metricContext = closer.register(
         GobblinMetricsRegistry.getInstance().getMetricContext(state, IcebergMetadataWriter.class, tags));
     this.eventSubmitter =