You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/05/26 16:41:36 UTC

[gobblin] branch master updated: [GOBBLIN-1805] Check watermark for the most recent hour for quiet topics (#3698)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 51a852d50 [GOBBLIN-1805] Check watermark for the most recent hour for quiet topics (#3698)
51a852d50 is described below

commit 51a852d506b749b9ac33568aff47105e14972a57
Author: vikram bohra <vb...@linkedin.com>
AuthorDate: Fri May 26 09:41:29 2023 -0700

    [GOBBLIN-1805] Check watermark for the most recent hour for quiet topics (#3698)
    
    * [GOBBLIN-1805] Check watermark for the most recent hour for quiet topics
    
    * fixed test case
---
 .../gobblin/iceberg/writer/IcebergMetadataWriter.java      |  8 ++++----
 .../gobblin/iceberg/writer/IcebergMetadataWriterTest.java  | 14 +++++++++-----
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 3135a1243..4cfb0bea0 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -23,6 +23,7 @@ import java.time.LocalDate;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -832,15 +833,14 @@ public class IcebergMetadataWriter implements MetadataWriter {
           tableMetadata.deleteFiles.get().commit();
         }
         // Check and update completion watermark when there are no files to be registered, typically for quiet topics
-        // The logic is to check the next window from previous completion watermark and update the watermark if there are no audit counts
+        // The logic is to check the window [currentHour-1,currentHour] and update the watermark if there are no audit counts
         if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
             && tableMetadata.completenessEnabled) {
           if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
             log.info(String.format("Checking kafka audit for %s on change_property ", topicName));
             SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
-            ZonedDateTime prevWatermarkDT =
-                Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
-            timestamps.add(TimeIterator.inc(prevWatermarkDT, TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
+            ZonedDateTime dtAtBeginningOfHour = ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
+            timestamps.add(dtAtBeginningOfHour);
             checkAndUpdateCompletenessWatermark(tableMetadata, topicName, timestamps, props);
           } else {
             log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 294ef08ab..bc1c8ca57 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -19,6 +19,9 @@ package org.apache.gobblin.iceberg.writer;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -492,9 +495,9 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
   @Test(dependsOnMethods={"testWriteAddFileGMCECompleteness"}, groups={"icebergMetadataWriterTest"})
   public void testChangePropertyGMCECompleteness() throws IOException {
 
-    Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-    long watermark = Long.parseLong(table.properties().get(COMPLETION_WATERMARK_KEY));
-    long expectedWatermark = watermark + TimeUnit.HOURS.toMillis(1);
+    ZonedDateTime expectedCWDt = ZonedDateTime.now(ZoneId.of(DEFAULT_TIME_ZONE)).truncatedTo(ChronoUnit.HOURS);
+    // For quiet topics, watermark should always be beginning of current hour
+    long expectedWatermark = expectedCWDt.toInstant().toEpochMilli();
     File hourlyFile2 = new File(tmpDir, "testDB/testTopic/hourly/2021/09/16/11/data.avro");
     gmce.setOldFilePrefixes(null);
     gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -511,11 +514,12 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
             new LongWatermark(65L))));
 
     KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class);
-    Mockito.when(verifier.isComplete("testTopic", watermark, expectedWatermark)).thenReturn(true);
+    // For quiet topics always check for previous hour window
+    Mockito.when(verifier.isComplete("testTopic", expectedCWDt.minusHours(1).toInstant().toEpochMilli(), expectedWatermark)).thenReturn(true);
     ((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
     gobblinMCEWriterWithCompletness.flush();
 
-    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-7000");
     Assert.assertEquals(table.spec().fields().get(1).name(), "late");
     Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");