You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/05/19 00:34:09 UTC

[gobblin] branch master updated: [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (#3696)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1548350bd [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (#3696)
1548350bd is described below

commit 1548350bdb3dedc10c66c4951684c75dc230c304
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Thu May 18 17:34:02 2023 -0700

    [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (#3696)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent
    
    * address comments
    
    ---------
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
 .../org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java     | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 1e88a83c7..3135a1243 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -788,7 +788,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
    * @return kafka topic name for this table
    */
   protected String getTopicName(TableIdentifier tid, TableMetadata tableMetadata) {
-    if (tableMetadata.dataOffsetRange.isPresent()) {
+    if (tableMetadata.dataOffsetRange.isPresent() && tableMetadata.dataOffsetRange.get().size() != 0) {
       String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
       //In case the topic name is not the table name or the topic name contains '-'
       return topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
@@ -1011,6 +1011,9 @@ public class IcebergMetadataWriter implements MetadataWriter {
         gobblinTrackingEvent.addMetadata(entry.getKey(), entry.getValue());
       }
     }
+    if (tableMetadata.completenessEnabled) {
+      gobblinTrackingEvent.addMetadata(COMPLETION_WATERMARK_KEY, Long.toString(tableMetadata.completionWatermark));
+    }
     eventSubmitter.submit(gobblinTrackingEvent);
   }