You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/05/19 00:34:09 UTC
[gobblin] branch master updated: [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (#3696)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1548350bd [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (#3696)
1548350bd is described below
commit 1548350bdb3dedc10c66c4951684c75dc230c304
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Thu May 18 17:34:02 2023 -0700
[GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (#3696)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent
* address comments
---------
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
.../org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 1e88a83c7..3135a1243 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -788,7 +788,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
* @return kafka topic name for this table
*/
protected String getTopicName(TableIdentifier tid, TableMetadata tableMetadata) {
- if (tableMetadata.dataOffsetRange.isPresent()) {
+ if (tableMetadata.dataOffsetRange.isPresent() && tableMetadata.dataOffsetRange.get().size() != 0) {
String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
//In case the topic name is not the table name or the topic name contains '-'
return topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
@@ -1011,6 +1011,9 @@ public class IcebergMetadataWriter implements MetadataWriter {
gobblinTrackingEvent.addMetadata(entry.getKey(), entry.getValue());
}
}
+ if (tableMetadata.completenessEnabled) {
+ gobblinTrackingEvent.addMetadata(COMPLETION_WATERMARK_KEY, Long.toString(tableMetadata.completionWatermark));
+ }
eventSubmitter.submit(gobblinTrackingEvent);
}