You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "rzhang10 (via GitHub)" <gi...@apache.org> on 2023/03/10 18:36:20 UTC

[GitHub] [gobblin] rzhang10 opened a new pull request, #3659: Add completion watermark to State

rzhang10 opened a new pull request, #3659:
URL: https://github.com/apache/gobblin/pull/3659

   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-XXX
   
   
   ### Description
   This patch adds an additional map `tableToCompletedPartitions` in the `State` class where it stores the table partition completeness information in memory, so that it can be later shared/retrieved by other `MetadataWriter` impl in the framework.
   
   The data stored in `tableToCompletedPartitions` will be removed once consumed by other MetadataWriters, so that the `tableToCompletedPartitions` will not be growing indefinitely.
   
   ### Tests
   `gradle :gobblin-iceberg:test`
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3659: Add completion watermark to State

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1139395912


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +950,14 @@ private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist the watermark into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String catalogDbTableNameLowerCased = catalogDbTableName.toLowerCase(Locale.ROOT);
+            this.state.setProp(catalogDbTableNameLowerCased + ".watermark", completionWatermark);

Review Comment:
   Can we make "%.completeion.watermark" to a static regex string, then you can easily use that to re-construct a configuration key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3659: Add completion watermark to State

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1134559596


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java:
##########
@@ -62,16 +63,21 @@ public class State implements WritableShim {
   @Getter
   private Properties specProperties;
 
+  // This in-mem state will be used to share partition completion information across different MetadataWriter impls
+  public Map<String, Set<String>> tableToCompletedPartitions;

Review Comment:
   Let's not put this map into a state object as this is one super common class in Gobblin and this map does not make sense for most of the use cases. Can you serialize the map and put it as one configuration of the properties?



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -23,18 +23,7 @@
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;

Review Comment:
   let's avoid * import 



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +938,15 @@ private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist this into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String tableNameLowerCased = tableName.toLowerCase(Locale.ROOT);
+            this.state.tableToCompletedPartitions.putIfAbsent(tableNameLowerCased, Sets.newHashSet());
+            this.state.tableToCompletedPartitions.get(tableNameLowerCased).add(timestampDT.format(HOURLY_DATEPARTITION_FORMAT));

Review Comment:
   If we have jumped in the watermark, i.e. we directly update the watermark from 1 am to 8 am, will you only have the  8 am partition in the completed partition list and miss partitions from 2 am to 7 am?



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +938,15 @@ private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist this into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String tableNameLowerCased = tableName.toLowerCase(Locale.ROOT);

Review Comment:
   Do we need db name information?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] rzhang10 commented on a diff in pull request #3659: Add completion watermark to State

Posted by "rzhang10 (via GitHub)" <gi...@apache.org>.
rzhang10 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1136323328


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -23,18 +23,7 @@
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;

Review Comment:
   This seems automatically done by my IDE, do you guys have a format file for me to set in the IDE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 merged pull request #3659: Add completion watermark to State

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 merged PR #3659:
URL: https://github.com/apache/gobblin/pull/3659


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] rzhang10 commented on a diff in pull request #3659: Add completion watermark to State

Posted by "rzhang10 (via GitHub)" <gi...@apache.org>.
rzhang10 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1136326361


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +938,15 @@ private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist this into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String tableNameLowerCased = tableName.toLowerCase(Locale.ROOT);
+            this.state.tableToCompletedPartitions.putIfAbsent(tableNameLowerCased, Sets.newHashSet());
+            this.state.tableToCompletedPartitions.get(tableNameLowerCased).add(timestampDT.format(HOURLY_DATEPARTITION_FORMAT));

Review Comment:
   Yeah, if we just use whatever in the `state` as is, then we will miss publishing the completion event for 2-7 am. But I think we can have a way to maintain the previously completed partition in the state for jasper to compute the delta, so upon the 8 am watermark, jasper can publish the completion event for 2-8 am altogether. Or, do you think we have a better alternative approach to this?
   
   But this will mean a huge SLA delay for downstream, does gobblin guarantee a SLA on the watermark?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] rzhang10 commented on a diff in pull request #3659: Add completion watermark to State

Posted by "rzhang10 (via GitHub)" <gi...@apache.org>.
rzhang10 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1136323726


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +938,15 @@ private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist this into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String tableNameLowerCased = tableName.toLowerCase(Locale.ROOT);

Review Comment:
   this table name already includes catalog and db name, it's in the format of `catalog.db.table`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] rzhang10 commented on a diff in pull request #3659: Add completion watermark to State

Posted by "rzhang10 (via GitHub)" <gi...@apache.org>.
rzhang10 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1139351226


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java:
##########
@@ -62,16 +63,21 @@ public class State implements WritableShim {
   @Getter
   private Properties specProperties;
 
+  // This in-mem state will be used to share partition completion information across different MetadataWriter impls
+  public Map<String, Set<String>> tableToCompletedPartitions;

Review Comment:
   The thing is if I make it a config, we can basically set it manually outside of the code logic, which is not what we want, right? Given this is just some internal memory state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] rzhang10 commented on a diff in pull request #3659: Add completion watermark to State

Posted by "rzhang10 (via GitHub)" <gi...@apache.org>.
rzhang10 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1139411372


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +950,14 @@ private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist the watermark into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String catalogDbTableNameLowerCased = catalogDbTableName.toLowerCase(Locale.ROOT);
+            this.state.setProp(catalogDbTableNameLowerCased + ".watermark", completionWatermark);

Review Comment:
   refactored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3659: Add completion watermark to State

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#issuecomment-1464244812

   ## [Codecov](https://codecov.io/gh/apache/gobblin/pull/3659?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3659](https://codecov.io/gh/apache/gobblin/pull/3659?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7e786af) into [master](https://codecov.io/gh/apache/gobblin/commit/8e8e3df03bf71ef42b0f774186fdc24288966294?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8e8e3df) will **increase** coverage by `1.64%`.
   > The diff coverage is `88.88%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3659      +/-   ##
   ============================================
   + Coverage     46.85%   48.50%   +1.64%     
   + Complexity    10745     7891    -2854     
   ============================================
     Files          2138     1478     -660     
     Lines         83962    58361   -25601     
     Branches       9327     6708    -2619     
   ============================================
   - Hits          39344    28307   -11037     
   + Misses        41037    27434   -13603     
   + Partials       3581     2620     -961     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3659?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../gobblin/iceberg/writer/IcebergMetadataWriter.java](https://codecov.io/gh/apache/gobblin/pull/3659?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1pY2ViZXJnL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ljZWJlcmcvd3JpdGVyL0ljZWJlcmdNZXRhZGF0YVdyaXRlci5qYXZh) | `72.47% <88.88%> (+0.36%)` | :arrow_up: |
   
   ... and [665 files with indirect coverage changes](https://codecov.io/gh/apache/gobblin/pull/3659/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] rzhang10 commented on a diff in pull request #3659: Add completion watermark to State

Posted by "rzhang10 (via GitHub)" <gi...@apache.org>.
rzhang10 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1139350061


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +938,15 @@ private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist this into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String tableNameLowerCased = tableName.toLowerCase(Locale.ROOT);
+            this.state.tableToCompletedPartitions.putIfAbsent(tableNameLowerCased, Sets.newHashSet());
+            this.state.tableToCompletedPartitions.get(tableNameLowerCased).add(timestampDT.format(HOURLY_DATEPARTITION_FORMAT));

Review Comment:
   ok, sounds good, i will talk to the team to decide how to manage the gap and jump on watermark case like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] rzhang10 commented on a diff in pull request #3659: Add completion watermark to State

Posted by "rzhang10 (via GitHub)" <gi...@apache.org>.
rzhang10 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1136322807


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java:
##########
@@ -62,16 +63,21 @@ public class State implements WritableShim {
   @Getter
   private Properties specProperties;
 
+  // This in-mem state will be used to share partition completion information across different MetadataWriter impls
+  public Map<String, Set<String>> tableToCompletedPartitions;

Review Comment:
   My intention is to make this map an in-memory transient object. I feel we shouldn't expose this as a config key that users can access and set externally. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3659: Add completion watermark to State

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3659:
URL: https://github.com/apache/gobblin/pull/3659#discussion_r1137558092


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java:
##########
@@ -62,16 +63,21 @@ public class State implements WritableShim {
   @Getter
   private Properties specProperties;
 
+  // This in-mem state will be used to share partition completion information across different MetadataWriter impls
+  public Map<String, Set<String>> tableToCompletedPartitions;

Review Comment:
   The config can be set/overwritten on the flight at runtime, which is only for internally usage. Having a map does not make sense for most use cases of this state object, so I would suggest using the existing API. 



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +938,15 @@ private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist this into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String tableNameLowerCased = tableName.toLowerCase(Locale.ROOT);
+            this.state.tableToCompletedPartitions.putIfAbsent(tableNameLowerCased, Sets.newHashSet());
+            this.state.tableToCompletedPartitions.get(tableNameLowerCased).add(timestampDT.format(HOURLY_DATEPARTITION_FORMAT));

Review Comment:
   We do guarantee SLA in most cases, but there are exceptions and GCNs can happen downstream which will cause these delays. My suggestion is to have Jasper expose the last completed partition and Jasper writer to compute the delta here as well. But it's just a reminder here and no need to address in this PR, maybe have one comment to indicate this scenario



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -947,9 +938,15 @@ private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime>
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist this into State object to share this with other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String tableNameLowerCased = tableName.toLowerCase(Locale.ROOT);

Review Comment:
   Then change it to be dbTableName to make it more clear?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org