You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/18 16:48:34 UTC

[GitHub] [iceberg] dixingxing0 opened a new pull request #2109: Store wartermark as iceberg table's property

dixingxing0 opened a new pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109


   Fixes https://github.com/apache/iceberg/issues/2108.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-763329566


   @dixingxing0 yeah. extending the `FlinkSink.Builder` sounds like a right approach to me.
   
   Small suggestion on the naming:
   - storeWatermarkEnabled -> storeWatermark
   - flink.watermark-for-default -> flink.watermark.default


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 edited a comment on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-769551567


   Through this PR, I have an idea whether we can add a `Callback` method to expose  to the user. After each submit succeed, the `Callback` method is called asynchronously, and the user can do some work in this callback method, such as send some information to kafka, and then consume the kafka data to perform some operations ,such as triggering downstream batch jobs,   what do you think ? @rdblue 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 edited a comment on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762686286


   > @dixingxing0 can you describe the motivation of checkpointing the watermarks in Flink state?
   > 
   > Ryan described our use of watermarks in snapshot metadata. They are used to indicate the data completeness on the ingestion path so that downstream batch consumer jobs can be triggered when data is complete for a window (like hourly).
   
   Thanks @stevenzwu, about the watermark state, i am just according to the current restore behavior:
   ```java
         NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
             .newTreeMap(checkpointsState.get().iterator().next())
             .tailMap(maxCommittedCheckpointId, false);
         if (!uncommittedDataFiles.isEmpty()) {
           **// Committed all uncommitted data files from the old flink job to iceberg table.**
           long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
           commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
         }
   ```
   Since flink will commit last uncommitted checkpoint, i think we should also store the right watermark for that checkpoint.
   
   Our use case is exactly same as you and @rdblue described, except we don't have multi writers 😄 .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r560642527



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -296,6 +318,13 @@ private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int
 
     long start = System.currentTimeMillis();
     operation.commit(); // abort is automatically called if this fails.
+
+    Long watermarkForCheckpoint = watermarkPerCheckpoint.get(checkpointId);

Review comment:
       We need to use table transaction here so that `operation.commit()` and `table.updatePropertie...commit() are atomic. This may require bigger refactoring of the code though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r560947820



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -305,6 +334,14 @@ public void processElement(StreamRecord<WriteResult> element) {
     this.writeResultsOfCurrentCkpt.add(element.getValue());
   }
 
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    super.processWatermark(mark);
+    if (mark.getTimestamp() != Watermark.MAX_WATERMARK.getTimestamp()) {

Review comment:
       As you said before, we use watermark to indicate the data completeness on the ingestion path, i think we do not need to store `MAX_WATERMARK` when flink job run in streaming-mode.
   If flink job run in batch-mode, even we store one `MAX_WATERMARK`, we still can't know which partition is completed, i think in batch-mode, we can just simply rely on the scheduling system. I'm not sure how to use the `MAX_WATERMARK`, so i just ignore it 😁.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r561086752



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -296,6 +318,13 @@ private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int
 
     long start = System.currentTimeMillis();
     operation.commit(); // abort is automatically called if this fails.
+
+    Long watermarkForCheckpoint = watermarkPerCheckpoint.get(checkpointId);

Review comment:
       We actually [set it as table properties too](https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L555).
   
   I think table properties is easier for the workflow scheduler (in the batch system) to query. Otherwise, they have to iterate the snapshots and find out the latest watermarks for all 3 regions. cc @rdblue 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 edited a comment on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762679911


   > @dixingxing0, in our implementation, we store watermarks in snapshot summary metadata. I think that's a more appropriate place for it because it is metadata about the snapshot that is produced. We also use a watermark per writer because we write in 3 different AWS regions. So I think it would make sense to be able to name each watermark, possibly with a default if you choose not to name it.
   
   Thanks @rdblue, agree with you.
   Since we will store watermark in snapshot summary metadata, we should also consider rewrite action, currently rewrite action will lost the extra properties in summary metadata, e.g. `flink.max-committed-checkpoint-id`,`flink.job-id`. 
   I think we should copy the extra properties from current snapshot to `RewriteFiles`(the new snapshot), but i am not sure if this would work as expected, after all flink will continuous produce new snapshot, i'm not sure how iceberg will resolve the confict, i'll do some tests first.
   
   About to name watermark, i think we can introduce an new confiuration `flink.watermark-name`:
   ```java
   // user specified configuration
   flink.store-watermark=false      // as default
   flink.watermark-name=default  // as default
   
   // written by flink file committer
   flink.watermark-for-default=the-watermark  // use flink.watermark-name as suffix
   ```
   
   @rdblue  what do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r560642527



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -296,6 +318,13 @@ private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int
 
     long start = System.currentTimeMillis();
     operation.commit(); // abort is automatically called if this fails.
+
+    Long watermarkForCheckpoint = watermarkPerCheckpoint.get(checkpointId);

Review comment:
       We need to use table transaction here so that `operation.commit()` and `table.updatePropertie...commit() are atomic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-763480600


   > @dixingxing0 yeah. extending the `FlinkSink.Builder` sounds like a right approach to me.
   > 
   > Small suggestion on the naming:
   > 
   > * storeWatermarkEnabled -> storeWatermark
   > * flink.watermark-for-default -> flink.watermark.default
   
   Thanks, i will address it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-763311317


   > @dixingxing0 thx a lot for the additional context. that is very helpful. I left a few comments.
   > 
   > Regarding the scenario of multiple writer jobs and single table, I am afraid that the additional config won't help because we are talking one table here.
   > 
   > Somehow, we need to allow a provider to provide the suffix for watermark property key. For us, the suffix is the AWS region. I am not sure what is the cleanest way to achieve that. We can define a provider class config and use reflection to instantiate it. I am hesitant with reflection as it is impossible to pass dependency to reflection instantiated class.
   
   @stevenzwu  thanks for the review and comments!
   
   As you described, we cannot config watermark name suffix as table property  for multiple writers 😁 . How about we introduce new fields in `FlinkSink.Builder` to config watermark name suffix, this should work for multiple writers:
   ```java
   // introduce new fields in org.apache.iceberg.flink.sink.FlinkSink.Builder
   private boolean storeWatermarkEnabled;      // default false
   private String watermarkNameSuffix;  // default "default"
   
   // iceberg `table property` or `snapshot summary` written by flink file committer
   flink.watermark-for-default=the-watermark  // use watermarkNameSuffix config as suffix
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r559870212



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -126,9 +137,16 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
+    Map<String, String> properties = this.table.properties();
+    this.currentWatermark = PropertyUtil.propertyAsLong(properties, CURRENT_WATERMARK, -1L);
+    this.storeWatermark = PropertyUtil.propertyAsBoolean(properties, STORE_WATERMARK, false);
+
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
+    this.watermarkState = context.getOperatorStateStore().getListState(WATERMARK_DESCRIPTOR);
+
     if (context.isRestored()) {
+      watermarkPerCheckpoint.putAll(watermarkState.get().iterator().next());

Review comment:
       Will this be backwards compatible for on going streaming jobs that don't have any `watermarkState` when they restore? For example, for on going streaming jobs that are upgraded to a version of iceberg that includes this patch?
   
   Looking at the Flink `AppendingState` interface, it says that calling `.get()` should return `null` if the state is empty. Also, you can see that the value of `restoredFlinkJobId` below from calling `jobIdState.get().iterator().next()` below is checked for null or empty.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 edited a comment on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762679911


   > @dixingxing0, in our implementation, we store watermarks in snapshot summary metadata. I think that's a more appropriate place for it because it is metadata about the snapshot that is produced. We also use a watermark per writer because we write in 3 different AWS regions. So I think it would make sense to be able to name each watermark, possibly with a default if you choose not to name it.
   
   Thanks @rdblue, agree with you.
   Since we will store watermark in snapshot summary metadata, we should also consider rewrite action, currently rewrite action will lost the extra properties in summary metadata, e.g. `flink.max-committed-checkpoint-id`,`flink.job-id`. 
   I think we should copy the extra properties from current snapshot to `RewriteFiles`(the new snapshot). 
   
   About to name watermark, i think we can introduce an new confiuration `flink.watermark-name`:
   ```java
   // user specified configuration
   flink.store-watermark=false      // as default
   flink.watermark-name=default  // as default
   
   // written by flink file committer
   flink.watermark-for-default=the-watermark  // use flink.watermark-name as suffix
   ```
   
   @rdblue  what do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 edited a comment on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762686286


   > @dixingxing0 can you describe the motivation of checkpointing the watermarks in Flink state?
   > 
   > Ryan described our use of watermarks in snapshot metadata. They are used to indicate the data completeness on the ingestion path so that downstream batch consumer jobs can be triggered when data is complete for a window (like hourly).
   
   Thanks @stevenzwu, about the watermark state, i am just according to the current restore behavior:
   ```java
         NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
             .newTreeMap(checkpointsState.get().iterator().next())
             .tailMap(maxCommittedCheckpointId, false);
         if (!uncommittedDataFiles.isEmpty()) {
           // Committed all uncommitted data files from the old flink job to iceberg table.
           long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
           commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
         }
   ```
   Since flink will commit last uncommitted checkpoint, i think we should also store the right watermark for that checkpoint.
   
   Our use case is exactly same as you and @rdblue described, except we don't have multi writers 😄 .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-763299851


   @dixingxing0 thx a lot for the additional context. that is very helpful. I left a few comments.
   
   Regarding the scenario of multiple writer jobs and single table, I am afraid that the additional config won't help because we are talking one table here.
   
   Somehow, we need to allow a provider to provide the suffix for watermark property key. For us, the suffix is the AWS region. I am not sure what is the cleanest way to achieve that. We can define a provider class config and use reflection to instantiate it. I am hesitant with reflection as it is impossible to pass dependency to reflection instantiated class.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r560641566



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -106,6 +114,9 @@
   // All pending checkpoints states for this function.
   private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
   private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
+  private static final ListStateDescriptor<Map<Long, Long>> WATERMARK_DESCRIPTOR = new ListStateDescriptor<>(
+      "iceberg-flink-watermark", new MapTypeInfo<>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO));
+  private transient ListState<Map<Long, Long>> watermarkState;

Review comment:
       should we define a `MetaData` to hold all checkpointed metadata fields so that we don't have to define a new state for each case?
   
   Ideally, I would prefer the metadata and the manifest file bundled in a single class (per checkpoint). That would require complexity of handling state schema evolution, which I am not sure if it is worth the effort.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762679911


   > @dixingxing0, in our implementation, we store watermarks in snapshot summary metadata. I think that's a more appropriate place for it because it is metadata about the snapshot that is produced. We also use a watermark per writer because we write in 3 different AWS regions. So I think it would make sense to be able to name each watermark, possibly with a default if you choose not to name it.
   
   Thanks @rdblue, agree with you.
   Since we will store watermark in snapshot summary metadata, we should also consider rewrite action, currently rewrite action will lost the extra properties in summary metadata, e.g. `flink.max-committed-checkpoint-id`,`flink.job-id`. 
   I think we should copy the extra properties from newest rewritten snapshot to `RewriteFiles`(the new snapshot). 
   
   About to name watermark, i think we can introduce an new confiuration `flink.watermark-name`:
   ```java
   // user specified configuration
   flink.store-watermark=false      // as default
   flink.watermark-name=default  // as default
   
   // flink file committer write property
   flink.watermark-for-default=the-watermark  // use flink.watermark-name as suffix
   ```
   
   @rdblue  what do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762493245


   @dixingxing0, in our implementation, we store watermarks in snapshot summary metadata. I think that's a more appropriate place for it because it is metadata about the snapshot that is produced. We also use a watermark per writer because we write in 3 different AWS regions. So I think it would make sense to be able to name each watermark, possibly with a default if you choose not to name it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762493308


   FYI @stevenzwu 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-769551567


   Through this PR, I have an idea whether we can add a `Callback` method to expose  to the user. After each submit succeed, the `Callback` method is called asynchronously, and the user can do some work in this callback method, such as send some information to kafka, and then consume the kafka data to perform some operations without constantly checking the watermark.   what do you think ? @rdblue 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r560643694



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -305,6 +334,14 @@ public void processElement(StreamRecord<WriteResult> element) {
     this.writeResultsOfCurrentCkpt.add(element.getValue());
   }
 
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    super.processWatermark(mark);
+    if (mark.getTimestamp() != Watermark.MAX_WATERMARK.getTimestamp()) {

Review comment:
       why do we need to ignore the `MAX_WATERMARK`? it signals the end of input.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 edited a comment on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762679911


   > @dixingxing0, in our implementation, we store watermarks in snapshot summary metadata. I think that's a more appropriate place for it because it is metadata about the snapshot that is produced. We also use a watermark per writer because we write in 3 different AWS regions. So I think it would make sense to be able to name each watermark, possibly with a default if you choose not to name it.
   
   Thanks @rdblue, agree with you.
   Since we will store watermark in snapshot summary metadata, we should also consider rewrite action, currently rewrite action will lost the extra properties in summary metadata, e.g. `flink.max-committed-checkpoint-id`,`flink.job-id`. 
   I think we should introduce an table property `snapshot-summary-inheritance.enabled`, if set it to true, `newSnapshot` will use the extra properties in `oldSnapshot` as default.
   
   About to name watermark, i think we can introduce an new confiuration `flink.watermark-name`:
   ```java
   // user specified configuration
   flink.store-watermark=false      // as default
   flink.watermark-name=default  // as default
   
   // written by flink file committer
   flink.watermark-for-default=the-watermark  // use flink.watermark-name as suffix
   ```
   
   @rdblue  what do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-769542927


   Using Flink watemark is definitely a very reasonable approach.
   
   We actually implemented this in a slightly different way of calculating the watermark. Instead of using Flink watermark, we add some additional metadata (min, max, sum, count)  per DataFile for the timestamp column. In the committer, we use the min of min to decide the watermark value. We never regress the watermark value. Those metadata can also help us calculate metrics for ingestion latency (commit time - event/Kafka time): like min, max, avg.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r560028159



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -126,9 +137,16 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
+    Map<String, String> properties = this.table.properties();
+    this.currentWatermark = PropertyUtil.propertyAsLong(properties, CURRENT_WATERMARK, -1L);
+    this.storeWatermark = PropertyUtil.propertyAsBoolean(properties, STORE_WATERMARK, false);
+
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
+    this.watermarkState = context.getOperatorStateStore().getListState(WATERMARK_DESCRIPTOR);
+
     if (context.isRestored()) {
+      watermarkPerCheckpoint.putAll(watermarkState.get().iterator().next());

Review comment:
       Thanks @kbendick, i neglected backwards compatible thing, current code will raise an `java.util.NoSuchElementException` when `watermarkState` is empty,  i will fix it, also i will logging whether watermark state is restored since the restore is not an high frequency event.
   BTW, `watermarkPerCheckpoint` is an instance of `HashMap`, i think the variable name misled you here :smile:.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762504937


   @dixingxing0 can you describe the motivation of checkpointing the watermarks in Flink state? 
   
   Ryan described our use of watermarks in snapshot metadata. They are used to indicate the data completeness on the ingestion path so that downstream batch consumer jobs can be triggered when data is complete for a window (like hourly).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r560706345



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -296,6 +318,13 @@ private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int
 
     long start = System.currentTimeMillis();
     operation.commit(); // abort is automatically called if this fails.
+
+    Long watermarkForCheckpoint = watermarkPerCheckpoint.get(checkpointId);

Review comment:
       If we store watermark in snapshot summary metadata as @rdblue said, we can omit `table.updatePropertie` transaction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] liubo1022126 commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
liubo1022126 commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-898322236


   > We actually implemented this in a slightly different way of calculating the watermark. Instead of using Flink watermark, we add some additional metadata (min, max, sum, count) per DataFile for the timestamp column. In the committer, we use the min of min to decide the watermark value. We never regress the watermark value. Those metadata can also help us calculate metrics for ingestion latency (commit time - event/Kafka time): like min, max, avg.
   > 
   > Just to share, by no means that I am suggesting changing the approach in this PR. It is perfectly good.
   
   thx @stevenzwu @rdblue, that sounds great! We also need to embed the iceberg table, which is regarded as real-time table, into our workflow. Is there any doc or patch for your implementation?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r560706345



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -296,6 +318,13 @@ private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int
 
     long start = System.currentTimeMillis();
     operation.commit(); // abort is automatically called if this fails.
+
+    Long watermarkForCheckpoint = watermarkPerCheckpoint.get(checkpointId);

Review comment:
       Yes, if we store watermark in snapshot summary metadata as @rdblue said, we can omit `table.updatePropertie` transaction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-770026006


   A callback sounds complicated and seems to tie too much of the back-end together. I wouldn't want Something plugged into the Iceberg component talking to Kafka directly. That sounds like we're trying to work around a framework limitation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r559870969



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -126,9 +137,16 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
+    Map<String, String> properties = this.table.properties();
+    this.currentWatermark = PropertyUtil.propertyAsLong(properties, CURRENT_WATERMARK, -1L);
+    this.storeWatermark = PropertyUtil.propertyAsBoolean(properties, STORE_WATERMARK, false);
+
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
+    this.watermarkState = context.getOperatorStateStore().getListState(WATERMARK_DESCRIPTOR);
+
     if (context.isRestored()) {
+      watermarkPerCheckpoint.putAll(watermarkState.get().iterator().next());

Review comment:
       Actually, on further inspection of the `ListState` interface, it says that passing `null` to `putAll` is a no-op. So I don' think there should be backwards compatibility issues, but should we possibly be (1) logging something if no watermark state is restored and/or (2) possibly doing our own `null` check vs relying on the documented behavior of `ListState#putAll` to be consistent over time when inserting `null`?
   
   I don't have a strong opinion about either point 1 or point 2, but I thought it might be worth bringing up for discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r560644684



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -106,6 +114,9 @@
   // All pending checkpoints states for this function.
   private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
   private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
+  private static final ListStateDescriptor<Map<Long, Long>> WATERMARK_DESCRIPTOR = new ListStateDescriptor<>(

Review comment:
       We probably should use `SortedMap` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762686286


   > @dixingxing0 can you describe the motivation of checkpointing the watermarks in Flink state?
   > 
   > Ryan described our use of watermarks in snapshot metadata. They are used to indicate the data completeness on the ingestion path so that downstream batch consumer jobs can be triggered when data is complete for a window (like hourly).
   
   Thanks @stevenzwu, about the watermark state, i am just according to the current restore behavior:
   ```java
         NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
             .newTreeMap(checkpointsState.get().iterator().next())
             .tailMap(maxCommittedCheckpointId, false);
         if (!uncommittedDataFiles.isEmpty()) {
           **// Committed all uncommitted data files from the old flink job to iceberg table.**
           long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
           commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
         }
   ```
   Since flink will commit last uncommitted checkpoint, i think we should also store the right watermark for that checkpoint.
   
   Our use case is exactly as you and @rdblue described, except we don't have multi writers 😄 .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r559870969



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -126,9 +137,16 @@ public void initializeState(StateInitializationContext context) throws Exception
     this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
     this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
+    Map<String, String> properties = this.table.properties();
+    this.currentWatermark = PropertyUtil.propertyAsLong(properties, CURRENT_WATERMARK, -1L);
+    this.storeWatermark = PropertyUtil.propertyAsBoolean(properties, STORE_WATERMARK, false);
+
     this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
     this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
+    this.watermarkState = context.getOperatorStateStore().getListState(WATERMARK_DESCRIPTOR);
+
     if (context.isRestored()) {
+      watermarkPerCheckpoint.putAll(watermarkState.get().iterator().next());

Review comment:
       Actually, on further inspection of the `ListState` interface, it says that passing `null` to `putAll` is a no-op. So I don' think there should be backwards compatibility issues, but should we possibly be (1) logging something if no watermark state is restored and/or (2) possibly doing our own `null` check vs relying on the documented behavior of `ListState#putAll` to be consistent over time when inserting `null`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-769542927


   We actually implemented this in a slightly different way of calculating the watermark. Instead of using Flink watermark, we add some additional metadata (min, max, sum, count)  per DataFile for the timestamp column. In the committer, we use the min of min to decide the watermark value. We never regress the watermark value. Those metadata can also help us calculate metrics for ingestion latency (commit time - event/Kafka time): like min, max, avg.
   
   Just to share, by no means that I am suggesting changing the approach in this PR. It is perfectly good.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 edited a comment on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762679911


   > @dixingxing0, in our implementation, we store watermarks in snapshot summary metadata. I think that's a more appropriate place for it because it is metadata about the snapshot that is produced. We also use a watermark per writer because we write in 3 different AWS regions. So I think it would make sense to be able to name each watermark, possibly with a default if you choose not to name it.
   
   Thanks @rdblue, agree with you.
   Since we will store watermark in snapshot summary metadata, we should also consider rewrite action, currently rewrite action will lost the extra properties in summary metadata, e.g. `flink.max-committed-checkpoint-id`,`flink.job-id`. 
   I think we should copy the extra properties from newest rewritten snapshot to `RewriteFiles`(the new snapshot). 
   
   About to name watermark, i think we can introduce an new confiuration `flink.watermark-name`:
   ```java
   // user specified configuration
   flink.store-watermark=false      // as default
   flink.watermark-name=default  // as default
   
   // written by flink file committer
   flink.watermark-for-default=the-watermark  // use flink.watermark-name as suffix
   ```
   
   @rdblue  what do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on a change in pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on a change in pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#discussion_r560947820



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -305,6 +334,14 @@ public void processElement(StreamRecord<WriteResult> element) {
     this.writeResultsOfCurrentCkpt.add(element.getValue());
   }
 
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    super.processWatermark(mark);
+    if (mark.getTimestamp() != Watermark.MAX_WATERMARK.getTimestamp()) {

Review comment:
       As you said before, we use watermark to indicate the data completeness on the ingestion path, i think we do not need to store `MAX_WATERMARK` when flink job run in streaming-mode.
   If flink job run in batch-mode, even we store one `MAX_WATERMARK`, we still can't know which partition is completed, i think in batch-mode, we can just simply rely on the scheduling system.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2109: Flink: store watermark as iceberg table's property

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-763299851


   @dixingxing0 thx a lot for the additional context. that is very helpful. I left a few comments.
   
   Regarding the scenario of multiple writer jobs and single table, I am afraid that the additional config won't help because we are talking one table here.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org