You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2022/06/07 21:48:31 UTC

[gobblin] branch master updated: [GOBBLIN-1657] Update completion watermark on change_property in IcebergMetadataWriter (#3517)

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ef18c482b [GOBBLIN-1657] Update completion watermark on change_property in IcebergMetadataWriter (#3517)
ef18c482b is described below

commit ef18c482b241378f8912fa1944914167a61f147c
Author: vbohra <vb...@linkedin.com>
AuthorDate: Tue Jun 7 14:48:27 2022 -0700

    [GOBBLIN-1657] Update completion watermark on change_property in IcebergMetadataWriter (#3517)
    
    * [GOBBLIN-1655] Update completion watermark for quiet tables during iceberg registration
    
    * [GOBBLIN-1657] Update completion watermark on change_proerty GMCE
    
    * Added test case to check watermark update on change_property
    
    Co-authored-by: Vikram Bohra <vb...@vbohra-mn1.linkedin.biz>
---
 .../verifier/KafkaAuditCountVerifier.java          |  7 ++
 .../iceberg/publisher/GobblinMCEPublisher.java     |  3 +
 .../iceberg/writer/IcebergMetadataWriter.java      | 86 ++++++++++++++--------
 .../iceberg/writer/IcebergMetadataWriterTest.java  | 41 ++++++++++-
 4 files changed, 103 insertions(+), 34 deletions(-)

diff --git a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index 04d49a1db..d4495f9eb 100644
--- a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -45,6 +45,8 @@ public class KafkaAuditCountVerifier {
   public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX + "reference.tiers";
   public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
   private static final double DEFAULT_THRESHOLD = 0.999;
+  public static final String COMPLETE_ON_NO_COUNTS = COMPLETENESS_PREFIX + "complete.on.no.counts";
+  private final boolean returnCompleteOnNoCounts;
 
   private final AuditCountClient auditCountClient;
   private final String srcTier;
@@ -67,6 +69,7 @@ public class KafkaAuditCountVerifier {
         state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
     this.srcTier = state.getProp(SOURCE_TIER);
     this.refTiers = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
+    this.returnCompleteOnNoCounts = state.getPropAsBoolean(COMPLETE_ON_NO_COUNTS, false);
   }
 
   /**
@@ -119,6 +122,10 @@ public class KafkaAuditCountVerifier {
     Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
     log.info(String.format("Audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
     countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+    if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
+      log.info(String.format("Found empty counts map for %s, returning complete", datasetName));
+      return 1.0;
+    }
     double percent = -1;
     if (!countsByTier.containsKey(this.srcTier)) {
       throw new IOException(String.format("Source tier %s audit count cannot be retrieved for dataset %s between %s and %s", this.srcTier, datasetName, beginInMillis, endInMillis));
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 8b76843af..3e58a53d1 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -98,7 +98,10 @@ public class GobblinMCEPublisher extends DataPublisher {
         // There'll be only one dummy file here. This file is parsed for DB and table name calculation.
         newFiles = computeDummyFile(state);
         if (!newFiles.isEmpty()) {
+          log.info("Dummy file: " + newFiles.keySet().iterator().next());
           this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.change_property, SchemaSource.NONE);
+        } else {
+          log.info("No dummy file created. Not sending GMCE");
         }
       } else {
         this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index ebc633e2c..9b0b18450 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -312,6 +312,11 @@ public class IcebergMetadataWriter implements MetadataWriter {
         return;
       }
     }
+    if(tableMetadata.completenessEnabled) {
+      tableMetadata.completionWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
+          String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
+    }
+
     computeCandidateSchema(gmce, tid, tableSpec);
     tableMetadata.ensureTxnInit();
     tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
@@ -322,12 +327,6 @@ public class IcebergMetadataWriter implements MetadataWriter {
         if (gmce.getTopicPartitionOffsetsRange() != null) {
           mergeOffsets(gmce, tid);
         }
-        //compute topic name
-        if(!tableMetadata.newProperties.get().containsKey(TOPIC_NAME_KEY) &&
-            tableMetadata.dataOffsetRange.isPresent() && !tableMetadata.dataOffsetRange.get().isEmpty()) {
-          String topicPartition = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
-          tableMetadata.newProperties.get().put(TOPIC_NAME_KEY, topicPartition.substring(0, topicPartition.lastIndexOf("-")));
-        }
         break;
       }
       case rewrite_files: {
@@ -411,6 +410,9 @@ public class IcebergMetadataWriter implements MetadataWriter {
     org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(tableSpec.getTable());
     TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
     tableMetadata.newProperties = Optional.of(IcebergUtils.getTableProperties(table));
+    String nativeName = tableMetadata.datasetName;
+    String topic = nativeName.substring(nativeName.lastIndexOf("/") + 1);
+    tableMetadata.newProperties.get().put(TOPIC_NAME_KEY, topic);
   }
 
   /**
@@ -692,8 +694,6 @@ public class IcebergMetadataWriter implements MetadataWriter {
         StructLike partition = getIcebergPartitionVal(hiveSpecs, file.getFilePath(), partitionSpec);
 
         if(tableMetadata.newPartitionColumnEnabled && gmce.getOperationType() == OperationType.add_files) {
-          tableMetadata.prevCompletenessWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
-              String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
           // Assumes first partition value to be partitioned by date
           // TODO Find better way to determine a partition value
           String datepartition = partition.get(0, null);
@@ -722,8 +722,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
   private StructLike addLatePartitionValueToIcebergTable(Table table, TableMetadata tableMetadata, HivePartition hivePartition, String datepartition) {
     table = addPartitionToIcebergTable(table, newPartitionColumn, newPartitionColumnType);
     PartitionSpec partitionSpec = table.spec();
-    long prevCompletenessWatermark = tableMetadata.prevCompletenessWatermark;
-    int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition, prevCompletenessWatermark);
+    int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition, tableMetadata.completionWatermark);
     List<String> partitionValues = new ArrayList<>(hivePartition.getValues());
     partitionValues.add(String.valueOf(late));
     return IcebergUtils.getPartition(partitionSpec.partitionType(), partitionValues);
@@ -790,28 +789,33 @@ public class IcebergMetadataWriter implements MetadataWriter {
         Transaction transaction = tableMetadata.transaction.get();
         Map<String, String> props = tableMetadata.newProperties.or(
             Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+        String topic = props.get(TOPIC_NAME_KEY);
         if (tableMetadata.appendFiles.isPresent()) {
           tableMetadata.appendFiles.get().commit();
           if (tableMetadata.completenessEnabled) {
-            String topicName = props.get(TOPIC_NAME_KEY);
-            if(topicName == null) {
-              log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s.%s",
-                  TOPIC_NAME_KEY, dbName, tableName));
-            } else {
-              long newCompletenessWatermark =
-                  computeCompletenessWatermark(topicName, tableMetadata.datePartitions, tableMetadata.prevCompletenessWatermark);
-              if(newCompletenessWatermark > tableMetadata.prevCompletenessWatermark) {
-                log.info(String.format("Updating %s for %s.%s to %s", COMPLETION_WATERMARK_KEY, dbName, tableName, newCompletenessWatermark));
-                props.put(COMPLETION_WATERMARK_KEY, String.valueOf(newCompletenessWatermark));
-                props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
-                tableMetadata.newCompletenessWatermark = newCompletenessWatermark;
-              }
-            }
+            checkAndUpdateCompletenessWatermark(tableMetadata, topic, tableMetadata.datePartitions, props);
           }
         }
         if (tableMetadata.deleteFiles.isPresent()) {
           tableMetadata.deleteFiles.get().commit();
         }
+        // Check and update completion watermark when there are no files to be registered, typically for quiet topics
+        // The logic is to check the next window from previous completion watermark and update the watermark if there are no audit counts
+        if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
+            && tableMetadata.completenessEnabled) {
+          if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
+            log.info(String.format("Checking kafka audit for %s on change_property ", topic));
+            SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
+            ZonedDateTime prevWatermarkDT =
+                Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
+            timestamps.add(TimeIterator.inc(prevWatermarkDT, TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
+            checkAndUpdateCompletenessWatermark(tableMetadata, topic, timestamps, props);
+          } else {
+            log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
+                tableMetadata.completionWatermark, topic));
+          }
+        }
+
         //Set high waterMark
         Long highWatermark = tableCurrentWatermarkMap.get(tid);
         props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
@@ -835,7 +839,6 @@ public class IcebergMetadataWriter implements MetadataWriter {
         }
         //Update schema(commit)
         updateSchema(tableMetadata, props, topicName);
-
         //Update properties
         UpdateProperties updateProperties = transaction.updateProperties();
         props.forEach(updateProperties::set);
@@ -850,7 +853,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
         submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
 
         //Reset the table metadata for next accumulation period
-        tableMetadata.reset(currentProps, highWatermark, tableMetadata.newCompletenessWatermark);
+        tableMetadata.reset(currentProps, highWatermark);
         log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
       } else {
         log.info("There's no transaction initiated for the table {}", tid.toString());
@@ -869,6 +872,30 @@ public class IcebergMetadataWriter implements MetadataWriter {
       this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
   }
 
+  /**
+   * Update TableMetadata with the new completion watermark upon a successful audit check
+   * @param tableMetadata metadata of table
+   * @param topic topic name
+   * @param timestamps Sorted set in reverse order of timestamps to check audit counts for
+   * @param props table properties map
+   */
+  private void checkAndUpdateCompletenessWatermark(TableMetadata tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
+      Map<String, String> props) {
+    if (topic == null) {
+      log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s",
+          TOPIC_NAME_KEY, tableMetadata.table.get().name()));
+    }
+    long newCompletenessWatermark =
+        computeCompletenessWatermark(topic, timestamps, tableMetadata.completionWatermark);
+    if (newCompletenessWatermark > tableMetadata.completionWatermark) {
+      log.info(String.format("Updating %s for %s to %s", COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
+          newCompletenessWatermark));
+      props.put(COMPLETION_WATERMARK_KEY, String.valueOf(newCompletenessWatermark));
+      props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
+      tableMetadata.completionWatermark = newCompletenessWatermark;
+    }
+  }
+
   /**
    * NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit counts match
    * for that window (aka its is set to the beginning of next window)
@@ -1085,8 +1112,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
     Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
     Optional<String> lastSchemaVersion = Optional.absent();
     Optional<Long> lowWatermark = Optional.absent();
-    long prevCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
-    long newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
+    long completionWatermark = DEFAULT_COMPLETION_WATERMARK;
     SortedSet<ZonedDateTime> datePartitions = new TreeSet<>(Collections.reverseOrder());
 
     @Setter
@@ -1131,7 +1157,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
       }
     }
 
-    void reset(Map<String, String> props, Long lowWaterMark, long newCompletionWatermark) {
+    void reset(Map<String, String> props, Long lowWaterMark) {
       this.lastProperties = Optional.of(props);
       this.lastSchemaVersion = Optional.of(props.get(SCHEMA_CREATION_TIME_KEY));
       this.transaction = Optional.absent();
@@ -1148,8 +1174,6 @@ public class IcebergMetadataWriter implements MetadataWriter {
       this.newProperties = Optional.absent();
       this.lowestGMCEEmittedTime = Long.MAX_VALUE;
       this.lowWatermark = Optional.of(lowWaterMark);
-      this.prevCompletenessWatermark = newCompletionWatermark;
-      this.newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
       this.datePartitions.clear();
     }
   }
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index da564fa3d..5e2e3d00d 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -423,12 +423,12 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
 
     // Test when completeness watermark = -1 bootstrap case
     KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class);
-    Mockito.when(verifier.isComplete("testTopic", timestampMillis - TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
+    Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis - TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
     ((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
     gobblinMCEWriterWithCompletness.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     //completeness watermark = "2020-09-16-10"
-    Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
+    Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testIcebergTable");
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), "America/Los_Angeles");
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis));
 
@@ -475,7 +475,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(60L))));
 
-    Mockito.when(verifier.isComplete("testTopic", timestampMillis1 - TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
+    Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis1 - TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
     gobblinMCEWriterWithCompletness.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis1));
@@ -486,6 +486,41 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
 
   }
 
+  @Test(dependsOnMethods={"testWriteAddFileGMCECompleteness"}, groups={"icebergMetadataWriterTest"})
+  public void testChangePropertyGMCECompleteness() throws IOException {
+
+    Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    long watermark = Long.parseLong(table.properties().get(COMPLETION_WATERMARK_KEY));
+    long expectedWatermark = watermark + TimeUnit.HOURS.toMillis(1);
+    File hourlyFile2 = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/11/data.avro");
+    gmce.setOldFilePrefixes(null);
+    gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+        .setFilePath(hourlyFile2.toString())
+        .setFileFormat("avro")
+        .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+        .build()));
+    gmce.setOperationType(OperationType.change_property);
+    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "6000-7000").build());
+    GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+    gobblinMCEWriterWithCompletness.writeEnvelope(new RecordEnvelope<>(genericGmce,
+        new KafkaStreamingExtractor.KafkaWatermark(
+            new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+            new LongWatermark(65L))));
+
+    KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class);
+    Mockito.when(verifier.isComplete("testIcebergTable", watermark, expectedWatermark)).thenReturn(true);
+    ((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
+    gobblinMCEWriterWithCompletness.flush();
+
+    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-7000");
+    Assert.assertEquals(table.spec().fields().get(1).name(), "late");
+    Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testIcebergTable");
+    Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), "America/Los_Angeles");
+    Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(expectedWatermark));
+
+  }
+
   private String writeRecord(File file) throws IOException {
     GenericData.Record record = new GenericData.Record(avroDataSchema);
     record.put("id", 1L);