You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "wsarecv (via GitHub)" <gi...@apache.org> on 2023/05/30 21:21:51 UTC

[GitHub] [gobblin] wsarecv opened a new pull request, #3701: [GOBBLIN-1838] Introduce total count based completion watermark

wsarecv opened a new pull request, #3701:
URL: https://github.com/apache/gobblin/pull/3701

   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title.
       - https://issues.apache.org/jira/browse/GOBBLIN-1838
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
       - Currently the completion watermark is determined according to the completeness percentage: `max of srcCount/refCount, for each refTier`. This change introduces a new "total count based completion watermark", which is determined by a new completeness percentage: `srcCount / sum of all refCount, for each refTier`.
       - The new total count based watermark will be used to check the completeness of data which is aggregated from multiple tiers.
   
   ### Tests
   - [ ] My PR adds the following unit tests:
       - `KafkaAuditCountVerifierTest`
       - `IcebergMetadataWriterTest`
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] wsarecv commented on a diff in pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

Posted by "wsarecv (via GitHub)" <gi...@apache.org>.
wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1230222909


##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -90,23 +102,52 @@ private static AuditCountClient getAuditClient(State state) {
     }
   }
 
+  public Map<CompletenessType, Boolean> calculateCompleteness(String datasetName, long beginInMillis, long endInMillis)
+      throws IOException {
+    return calculateCompleteness(datasetName, beginInMillis, endInMillis, this.threshold);
+  }
+
   /**
    * Compare source tier against reference tiers.
-   * Compute completion percentage by srcCount/refCount. Return true iff the highest percentages is greater than threshold.
+   * Compute completion percentage which is true iff the calculated percentages is greater than threshold.
    *
    * @param datasetName A dataset short name like 'PageViewEvent'
    * @param beginInMillis Unix timestamp in milliseconds
    * @param endInMillis Unix timestamp in milliseconds
    * @param threshold User defined threshold
+   *
+   * @return a map of completeness result by CompletenessType
    */
-  public boolean isComplete(String datasetName, long beginInMillis, long endInMillis, double threshold)
-      throws IOException {
-    return getCompletenessPercentage(datasetName, beginInMillis, endInMillis) > threshold;
+  public Map<CompletenessType, Boolean> calculateCompleteness(String datasetName, long beginInMillis, long endInMillis,
+      double threshold) throws IOException {
+    Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
+    log.info(String.format("checkTierCounts: audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
+    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+
+    Map<CompletenessType, Boolean> result = new HashMap<>();
+    result.put(CompletenessType.ClassicCompleteness, CalculateCompleteness(datasetName, beginInMillis, endInMillis,
+        CompletenessType.ClassicCompleteness, countsByTier) > threshold);
+    result.put(CompletenessType.TotalCountCompleteness, CalculateCompleteness(datasetName, beginInMillis, endInMillis,
+        CompletenessType.TotalCountCompleteness, countsByTier) > threshold);
+    return result;
   }
 
-  public boolean isComplete(String datasetName, long beginInMillis, long endInMillis)
-      throws IOException {
-    return isComplete(datasetName, beginInMillis, endInMillis, this.threshold);
+  private double CalculateCompleteness(String datasetName, long beginInMillis, long endInMillis, CompletenessType type,

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] vikrambohra commented on a diff in pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

Posted by "vikrambohra (via GitHub)" <gi...@apache.org>.
vikrambohra commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1230152890


##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -90,23 +102,52 @@ private static AuditCountClient getAuditClient(State state) {
     }
   }
 
+  public Map<CompletenessType, Boolean> calculateCompleteness(String datasetName, long beginInMillis, long endInMillis)
+      throws IOException {
+    return calculateCompleteness(datasetName, beginInMillis, endInMillis, this.threshold);
+  }
+
   /**
    * Compare source tier against reference tiers.
-   * Compute completion percentage by srcCount/refCount. Return true iff the highest percentages is greater than threshold.
+   * Compute completion percentage which is true iff the calculated percentages is greater than threshold.
    *
    * @param datasetName A dataset short name like 'PageViewEvent'
    * @param beginInMillis Unix timestamp in milliseconds
    * @param endInMillis Unix timestamp in milliseconds
    * @param threshold User defined threshold
+   *
+   * @return a map of completeness result by CompletenessType
    */
-  public boolean isComplete(String datasetName, long beginInMillis, long endInMillis, double threshold)
-      throws IOException {
-    return getCompletenessPercentage(datasetName, beginInMillis, endInMillis) > threshold;
+  public Map<CompletenessType, Boolean> calculateCompleteness(String datasetName, long beginInMillis, long endInMillis,
+      double threshold) throws IOException {
+    Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
+    log.info(String.format("checkTierCounts: audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
+    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+
+    Map<CompletenessType, Boolean> result = new HashMap<>();
+    result.put(CompletenessType.ClassicCompleteness, CalculateCompleteness(datasetName, beginInMillis, endInMillis,
+        CompletenessType.ClassicCompleteness, countsByTier) > threshold);
+    result.put(CompletenessType.TotalCountCompleteness, CalculateCompleteness(datasetName, beginInMillis, endInMillis,
+        CompletenessType.TotalCountCompleteness, countsByTier) > threshold);
+    return result;
   }
 
-  public boolean isComplete(String datasetName, long beginInMillis, long endInMillis)
-      throws IOException {
-    return isComplete(datasetName, beginInMillis, endInMillis, this.threshold);
+  private double CalculateCompleteness(String datasetName, long beginInMillis, long endInMillis, CompletenessType type,

Review Comment:
   calculateCompleteness?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#issuecomment-1573115023

   ## [Codecov](https://app.codecov.io/gh/apache/gobblin/pull/3701?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#3701](https://app.codecov.io/gh/apache/gobblin/pull/3701?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (a435b57) into [master](https://app.codecov.io/gh/apache/gobblin/commit/51a852d506b749b9ac33568aff47105e14972a57?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (51a852d) will **increase** coverage by `7.69%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3701      +/-   ##
   ============================================
   + Coverage     46.97%   54.67%   +7.69%     
   + Complexity    10794     1383    -9411     
   ============================================
     Files          2138      278    -1860     
     Lines         84132     9965   -74167     
     Branches       9356     1078    -8278     
   ============================================
   - Hits          39518     5448   -34070     
   + Misses        41015     4009   -37006     
   + Partials       3599      508    -3091     
   ```
   
   
   [see 1860 files with indirect coverage changes](https://app.codecov.io/gh/apache/gobblin/pull/3701/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1218364957


##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -151,6 +160,49 @@ private double getCompletenessPercentage(String datasetName, long beginInMillis,
     return percent;
   }
 
+  private double getTotalCountCompletenessPercentage(String datasetName, long beginInMillis, long endInMillis) throws IOException {
+    Preconditions.checkNotNull(this.totalCountRefTiers);
+
+    Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, this.srcTier, this.totalCountRefTiers);
+    if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   Same here



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -846,6 +863,18 @@ public void flush(String dbName, String tableName) throws IOException {
             log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
                 tableMetadata.completionWatermark, topicName));
           }
+
+          // Deal with total count completion watermark.
+          if (tableMetadata.totalCountCompletionWatermark > DEFAULT_COMPLETION_WATERMARK) {

Review Comment:
   Same here, a bunch of duplicate code as lines 854 to 862, can we try to reduce the duplication here?



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -962,13 +1017,27 @@ private long computeCompletenessWatermark(String catalogDbTableName, String topi
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
           ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
-          if (auditCountVerifier.get().isComplete(topicName,
-              auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+          if (!isTotalCountCompletenessWatermark && auditCountVerifier.get().isComplete(
+              topicName, auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
             // Also persist the watermark into State object to share this with other MetadataWriters
             // we enforce ourselves to always use lower-cased table name here
             String catalogDbTableNameLowerCased = catalogDbTableName.toLowerCase(Locale.ROOT);
-            this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, catalogDbTableNameLowerCased), completionWatermark);
+            this.state.setProp(
+                String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, catalogDbTableNameLowerCased),
+                completionWatermark);
+            break;
+          }
+
+          if (isTotalCountCompletenessWatermark && auditCountVerifier.get().isTotalCountComplete(
+              topicName, auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+            completionWatermark = timestampMillis;
+            // Also persist the watermark into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String catalogDbTableNameLowerCased = catalogDbTableName.toLowerCase(Locale.ROOT);
+            this.state.setProp(

Review Comment:
   Should we merge the two if blocks? Seems like the only difference is the completeness property name, maybe worth considering removing duplicate code here?



##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -120,27 +140,16 @@ public boolean isComplete(String datasetName, long beginInMillis, long endInMill
    */
   private double getCompletenessPercentage(String datasetName, long beginInMillis, long endInMillis) throws IOException {
     Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
-    log.info(String.format("Audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
-    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, this.srcTier, this.refTiers);
     if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   If countsByTier is empty, we will not reach this block at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] wsarecv commented on a diff in pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

Posted by "wsarecv (via GitHub)" <gi...@apache.org>.
wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1228473902


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -891,94 +898,40 @@ public void flush(String dbName, String tableName) throws IOException {
     }
   }
 
-  @Override
-  public void reset(String dbName, String tableName) throws IOException {
-    this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+  private CompletenessWatermarkUpdater getWatermarkUpdater(String topicName, TableMetadata tableMetadata,
+      Map<String, String> propsToUpdate) {
+    return new CompletenessWatermarkUpdater(topicName, this.auditCheckGranularity, this.timeZone,
+        tableMetadata, propsToUpdate, this.state, this.auditCountVerifier.get());
   }
 
-  /**
-   * Update TableMetadata with the new completion watermark upon a successful audit check
-   * @param tableMetadata metadata of table
-   * @param topic topic name
-   * @param timestamps Sorted set in reverse order of timestamps to check audit counts for
-   * @param props table properties map
-   */
-  private void checkAndUpdateCompletenessWatermark(TableMetadata tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
-      Map<String, String> props) {
-    String tableName = tableMetadata.table.get().name();
-    if (topic == null) {
-      log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s",
-          TOPIC_NAME_KEY, tableName));
-    }
-    long newCompletenessWatermark =
-        computeCompletenessWatermark(tableName, topic, timestamps, tableMetadata.completionWatermark);
-    if (newCompletenessWatermark > tableMetadata.completionWatermark) {
-      log.info(String.format("Updating %s for %s to %s", COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
-          newCompletenessWatermark));
-      props.put(COMPLETION_WATERMARK_KEY, String.valueOf(newCompletenessWatermark));
-      props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
-      tableMetadata.completionWatermark = newCompletenessWatermark;
-    }
+  private void updateWatermarkWithFilesRegistered(String topicName, TableMetadata tableMetadata,
+      Map<String, String> propsToUpdate, boolean includeTotalCountCompletionWatermark) {
+    getWatermarkUpdater(topicName, tableMetadata, propsToUpdate)
+        .run(tableMetadata.datePartitions, includeTotalCountCompletionWatermark);
   }
 
-  /**
-   * NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit counts match
-   * for that window (aka its is set to the beginning of next window)
-   * For each timestamp in sorted collection of timestamps in descending order
-   * if timestamp is greater than previousWatermark
-   * and hour(now) > hour(prevWatermark)
-   *    check audit counts for completeness between
-   *    a source and reference tier for [timestamp -1 , timstamp unit of granularity]
-   *    If the audit count matches update the watermark to the timestamp and break
-   *    else continue
-   * else
-   *  break
-   * Using a {@link TimeIterator} that operates over a range of time in 1 unit
-   * given the start, end and granularity
-   * @param catalogDbTableName
-   * @param topicName
-   * @param timestamps a sorted set of timestamps in decreasing order
-   * @param previousWatermark previous completion watermark for the table
-   * @return updated completion watermark
-   */
-  private long computeCompletenessWatermark(String catalogDbTableName, String topicName, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
-    log.info(String.format("Compute completion watermark for %s and timestamps %s with previous watermark %s", topicName, timestamps, previousWatermark));
-    long completionWatermark = previousWatermark;
-    ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
-    try {
-      if(timestamps == null || timestamps.size() <= 0) {
-        log.error("Cannot create time iterator. Empty for null timestamps");
-        return previousWatermark;
-      }
-      TimeIterator.Granularity granularity = TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
-      ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(previousWatermark)
-          .atZone(ZoneId.of(this.timeZone));
-      ZonedDateTime startDT = timestamps.first();
-      ZonedDateTime endDT = timestamps.last();
-      TimeIterator iterator = new TimeIterator(startDT, endDT, granularity, true);
-      while (iterator.hasNext()) {
-        ZonedDateTime timestampDT = iterator.next();
-        if (timestampDT.isAfter(prevWatermarkDT)
-            && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
-          long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
-          if (auditCountVerifier.get().isComplete(topicName,
-              auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
-            completionWatermark = timestampMillis;
-            // Also persist the watermark into State object to share this with other MetadataWriters
-            // we enforce ourselves to always use lower-cased table name here
-            String catalogDbTableNameLowerCased = catalogDbTableName.toLowerCase(Locale.ROOT);
-            this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, catalogDbTableNameLowerCased), completionWatermark);
-            break;
-          }
-        } else {
-          break;
-        }
-      }
-    } catch (IOException e) {
-      log.warn("Exception during audit count check: ", e);
+  private void updateWatermarkWithNoFilesRegistered(String topicName, TableMetadata tableMetadata,

Review Comment:
   This will be refined in next iteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] vikrambohra commented on a diff in pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

Posted by "vikrambohra (via GitHub)" <gi...@apache.org>.
vikrambohra commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1224949023


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -891,94 +901,45 @@ public void flush(String dbName, String tableName) throws IOException {
     }
   }
 
-  @Override
-  public void reset(String dbName, String tableName) throws IOException {
-    this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+  private AbstractCompletenessWatermarkUpdater getWatermarkUpdater(String topicName, TableMetadata tableMetadata,
+      Map<String, String> propsToUpdate, boolean isTotalCountCompleteness) {
+    return isTotalCountCompleteness

Review Comment:
   Is this correct? Seems reverse to me



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -836,15 +851,10 @@ public void flush(String dbName, String tableName) throws IOException {
         // The logic is to check the window [currentHour-1,currentHour] and update the watermark if there are no audit counts
         if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
             && tableMetadata.completenessEnabled) {
-          if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
-            log.info(String.format("Checking kafka audit for %s on change_property ", topicName));
-            SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
-            ZonedDateTime dtAtBeginningOfHour = ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
-            timestamps.add(dtAtBeginningOfHour);
-            checkAndUpdateCompletenessWatermark(tableMetadata, topicName, timestamps, props);
-          } else {
-            log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
-                tableMetadata.completionWatermark, topicName));
+          updateWatermarkWithEmptyFilesRegistered(topicName, tableMetadata, props, false);
+
+          if (tableMetadata.totalCountCompletenessEnabled) {
+            updateWatermarkWithEmptyFilesRegistered(topicName, tableMetadata, props, true);

Review Comment:
   updateWatermarkWithNoFilesRegistered?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] wsarecv commented on a diff in pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

Posted by "wsarecv (via GitHub)" <gi...@apache.org>.
wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1227756280


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -836,15 +851,10 @@ public void flush(String dbName, String tableName) throws IOException {
         // The logic is to check the window [currentHour-1,currentHour] and update the watermark if there are no audit counts
         if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
             && tableMetadata.completenessEnabled) {
-          if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
-            log.info(String.format("Checking kafka audit for %s on change_property ", topicName));
-            SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
-            ZonedDateTime dtAtBeginningOfHour = ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
-            timestamps.add(dtAtBeginningOfHour);
-            checkAndUpdateCompletenessWatermark(tableMetadata, topicName, timestamps, props);
-          } else {
-            log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
-                tableMetadata.completionWatermark, topicName));
+          updateWatermarkWithEmptyFilesRegistered(topicName, tableMetadata, props, false);
+
+          if (tableMetadata.totalCountCompletenessEnabled) {
+            updateWatermarkWithEmptyFilesRegistered(topicName, tableMetadata, props, true);

Review Comment:
   Fixed



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -891,94 +901,45 @@ public void flush(String dbName, String tableName) throws IOException {
     }
   }
 
-  @Override
-  public void reset(String dbName, String tableName) throws IOException {
-    this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+  private AbstractCompletenessWatermarkUpdater getWatermarkUpdater(String topicName, TableMetadata tableMetadata,
+      Map<String, String> propsToUpdate, boolean isTotalCountCompleteness) {
+    return isTotalCountCompleteness

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 merged pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 merged PR #3701:
URL: https://github.com/apache/gobblin/pull/3701


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] wsarecv commented on a diff in pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

Posted by "wsarecv (via GitHub)" <gi...@apache.org>.
wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1222735916


##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -120,27 +140,16 @@ public boolean isComplete(String datasetName, long beginInMillis, long endInMill
    */
   private double getCompletenessPercentage(String datasetName, long beginInMillis, long endInMillis) throws IOException {
     Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
-    log.info(String.format("Audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
-    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, this.srcTier, this.refTiers);
     if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   Good catch. Fixed it and added a new unit test.



##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -151,6 +160,49 @@ private double getCompletenessPercentage(String datasetName, long beginInMillis,
     return percent;
   }
 
+  private double getTotalCountCompletenessPercentage(String datasetName, long beginInMillis, long endInMillis) throws IOException {
+    Preconditions.checkNotNull(this.totalCountRefTiers);
+
+    Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, this.srcTier, this.totalCountRefTiers);
+    if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   Refactored.



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -962,13 +1017,27 @@ private long computeCompletenessWatermark(String catalogDbTableName, String topi
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
           ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
-          if (auditCountVerifier.get().isComplete(topicName,
-              auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+          if (!isTotalCountCompletenessWatermark && auditCountVerifier.get().isComplete(
+              topicName, auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
             // Also persist the watermark into State object to share this with other MetadataWriters
             // we enforce ourselves to always use lower-cased table name here
             String catalogDbTableNameLowerCased = catalogDbTableName.toLowerCase(Locale.ROOT);
-            this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, catalogDbTableNameLowerCased), completionWatermark);
+            this.state.setProp(
+                String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, catalogDbTableNameLowerCased),
+                completionWatermark);
+            break;
+          }
+
+          if (isTotalCountCompletenessWatermark && auditCountVerifier.get().isTotalCountComplete(
+              topicName, auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
+            completionWatermark = timestampMillis;
+            // Also persist the watermark into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String catalogDbTableNameLowerCased = catalogDbTableName.toLowerCase(Locale.ROOT);
+            this.state.setProp(

Review Comment:
   Refactored.



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -846,6 +863,18 @@ public void flush(String dbName, String tableName) throws IOException {
             log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
                 tableMetadata.completionWatermark, topicName));
           }
+
+          // Deal with total count completion watermark.
+          if (tableMetadata.totalCountCompletionWatermark > DEFAULT_COMPLETION_WATERMARK) {

Review Comment:
   Refactored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] wsarecv commented on a diff in pull request #3701: [GOBBLIN-1838] Introduce total count based completion watermark

Posted by "wsarecv (via GitHub)" <gi...@apache.org>.
wsarecv commented on code in PR #3701:
URL: https://github.com/apache/gobblin/pull/3701#discussion_r1222735916


##########
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java:
##########
@@ -120,27 +140,16 @@ public boolean isComplete(String datasetName, long beginInMillis, long endInMill
    */
   private double getCompletenessPercentage(String datasetName, long beginInMillis, long endInMillis) throws IOException {
     Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
-    log.info(String.format("Audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
-    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+    validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, this.srcTier, this.refTiers);
     if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {

Review Comment:
   Good catch. Fixed it and added a new unit test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org