You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/03/29 21:16:40 UTC
[gobblin] branch master updated: Add/fix some fields of MetadataWriterFailureEvent (#3485)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3f22465 Add/fix some fields of MetadataWriterFailureEvent (#3485)
3f22465 is described below
commit 3f2246575e63538bccf24c83756bd45dabf82116
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Tue Mar 29 14:16:32 2022 -0700
Add/fix some fields of MetadataWriterFailureEvent (#3485)
---
.../org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java | 11 +++++++++--
.../apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java | 2 ++
.../apache/gobblin/iceberg/writer/IcebergMetadataWriter.java | 3 +--
3 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index 86d0b5b..4dcefe7 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -62,6 +62,7 @@ import org.apache.gobblin.metadata.DataFile;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.source.extractor.CheckpointableWatermark;
@@ -142,7 +143,10 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
FileSystem.get(HadoopUtils.getConfFromState(properties))));
parallelRunnerTimeoutMills =
state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS, DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS);
- MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass());
+ List<Tag<?>> tags = Lists.newArrayList();
+ String clusterIdentifier = ClustersNames.getInstance().getClusterName();
+ tags.add(new Tag<>(IcebergMCEMetadataKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
+ MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags);
eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
}
@@ -446,9 +450,12 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATASET_HDFS_PATH, exception.datasetPath);
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_DB_NAME, exception.dbName);
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILURE_EVENT_TABLE_NAME, exception.tableName);
- gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, exception.GMCETopicPartition);
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_NAME, exception.GMCETopicPartition.split("-")[0]);
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, exception.GMCETopicPartition.split("-")[1]);
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK, Long.toString(exception.highWatermark));
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK, Long.toString(exception.lowWatermark));
+ String message = exception.getCause() == null ? exception.getMessage() : exception.getCause().getMessage();
+ gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.EXCEPTION_MESSAGE_KEY_NAME, message);
eventSubmitter.submit(gobblinTrackingEvent);
}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
index a4eb3b1..f925b36 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMCEMetadataKeys.java
@@ -34,6 +34,8 @@ public class IcebergMCEMetadataKeys {
public static final String DATASET_HDFS_PATH = "datasetHdfsPath";
public static final String FAILURE_EVENT_DB_NAME = "databaseName";
public static final String FAILURE_EVENT_TABLE_NAME = "tableName";
+ public static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
+ public static final String EXCEPTION_MESSAGE_KEY_NAME = "exceptionMessage";
private IcebergMCEMetadataKeys() {
}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index df5739e..54e4406 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -144,7 +144,6 @@ public class IcebergMetadataWriter implements MetadataWriter {
private static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
private static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
private static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
- private static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
private static final String SCHEMA_CREATION_TIME_KEY = "schema.creation.time";
private static final String ADDED_FILES_CACHE_EXPIRING_TIME = "added.files.cache.expiring.time";
@@ -200,7 +199,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
tableCurrentWatermarkMap = new HashMap<>();
List<Tag<?>> tags = Lists.newArrayList();
String clusterIdentifier = ClustersNames.getInstance().getClusterName();
- tags.add(new Tag<>(CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
+ tags.add(new Tag<>(IcebergMCEMetadataKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
metricContext = closer.register(
GobblinMetricsRegistry.getInstance().getMetricContext(state, IcebergMetadataWriter.class, tags));
this.eventSubmitter =