You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "ZihanLi58 (via GitHub)" <gi...@apache.org> on 2023/06/05 17:53:01 UTC

[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

ZihanLi58 commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1218364957


##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -151,6 +160,49 @@ private double getCompletenessPercentage(String datasetName, long beginInMillis,
     return percent;
   }
 
+  private double getTotalCountCompletenessPercentage(String datasetName, long beginInMillis, long endInMillis) throws IOException {
+    Preconditions.checkNotNull(this.totalCountRefTiers);
+
+    Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, this.srcTier, this.totalCountRefTiers);
+    if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   Same here



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -846,6 +863,18 @@ public void flush(String dbName, String tableName) throws IOException {
             log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
                 tableMetadata.completionWatermark, topicName));
           }
+
+          // Deal with total count completion watermark.
+          if (tableMetadata.totalCountCompletionWatermark > DEFAULT_COMPLETION_WATERMARK) {

Review Comment:
   Same here, a bunch of duplicate code as lines 854 to 862, can we try to reduce the duplication here?



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -962,13 +1017,27 @@ private long computeCompletenessWatermark(String catalogDbTableName, String topi
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
           ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
-          if (auditCountVerifier.get().isComplete(topicName,
-              auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+          if (!isTotalCountCompletenessWatermark && auditCountVerifier.get().isComplete(
+              topicName, auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
             // Also persist the watermark into State object to share this with other MetadataWriters
             // we enforce ourselves to always use lower-cased table name here
             String catalogDbTableNameLowerCased = catalogDbTableName.toLowerCase(Locale.ROOT);
-            this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, catalogDbTableNameLowerCased), completionWatermark);
+            this.state.setProp(
+                String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, catalogDbTableNameLowerCased),
+                completionWatermark);
+            break;
+          }
+
+          if (isTotalCountCompletenessWatermark && auditCountVerifier.get().isTotalCountComplete(
+              topicName, auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+            completionWatermark = timestampMillis;
+            // Also persist the watermark into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String catalogDbTableNameLowerCased = catalogDbTableName.toLowerCase(Locale.ROOT);
+            this.state.setProp(

Review Comment:
   Should we merge the two if blocks? Seems like the only difference is the completeness property name, maybe worth considering removing duplicate code here?



##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -120,27 +140,16 @@ public boolean isComplete(String datasetName, long beginInMillis, long endInMill
    */
   private double getCompletenessPercentage(String datasetName, long beginInMillis, long endInMillis) throws IOException {
     Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
-    log.info(String.format("Audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
-    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, this.srcTier, this.refTiers);
     if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   If countsByTier is empty, we will not reach this block at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org