You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/10 03:34:11 UTC

[GitHub] [hudi] voonhous commented on a diff in pull request #6856: [HUDI-4968] Update misleading read.streaming.skip_compaction config

voonhous commented on code in PR #6856:
URL: https://github.com/apache/hudi/pull/6856#discussion_r990894756


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -279,8 +279,9 @@ private FlinkOptions() {
       .defaultValue(false)// default read as batch
       .withDescription("Whether to skip compaction instants for streaming read,\n"
           + "there are two cases that this option can be used to avoid reading duplicates:\n"
-          + "1) you are definitely sure that the consumer reads faster than any compaction instants, "
-          + "usually with delta time compaction strategy that is long enough, for e.g, one week;\n"
+          + "1) `hoodie.compaction.preserve.commit.metadata` is set to `false` and you are definitely sure that the "
+          + "consumer reads faster than any compaction instants, usually with delta time compaction strategy that is "

Review Comment:
   Understand that `hoodie.compaction.preserve.commit.metadata` is `true` by default. 
   
   The default values for the configuration keys of interest are as such:
   ```properties
   hoodie.compaction.preserve.commit.metadata=true
   read.streaming.skip_compaction=false
   ```
   
   The phrasing of the `read.streaming.skip_compaction` configuration's description is VERY CONFUSING. 
   
   As of now, the description is as such:
   
   ```txt
   Whether to skip compaction instants for streaming read, there are two cases that this option can be used to avoid reading duplicates:
   
   1) you are definitely sure that the consumer reads faster than any compaction instants, usually with delta time compaction strategy that is long enough, for e.g, one week;
   
   2) changelog mode is enabled, this option is a solution to keep data integrity
   ```
   
   What I understand from this is:
   You can enable (set the configuration value to true) `read.streaming.skip_compaction` to prevent reading of duplicates under these 2 cases:
   
   1) you are definitely sure that the consumer reads faster than any compaction instants, usually with delta time compaction strategy that is long enough, for e.g, one week;
   
   # Consumer reads FASTER than compaction instants
   Consumer reads faster than any compaction instants would mean that compaction is slower than consumer read. 
   
   As such, compaction will complete after reading. A concrete example is shown below. I am not sure if I am understanding is correct. Please correct me for any conceptual mistakes made.
   
   ## Read iteration 1
   Say the commit timeline looks like this at read iteration 1:
   1. compaction plan at `instant-0` is created but still ongoing
   2. deltacommit at `instant-1` has completed
   
   The read consumer will only read out newly inserted rows in deltacommit @ `instant-1`.
   
   Issued instant is updated as such:
   ```txt
   issuedInstant=1
   ```
   
   ```
   0.compaction.requested
   0.compaction.inflight
   
   1.deltacommit.requested
   1.deltacommit.inflight
   1.deltacommit
   ```
   
   ### Read iteration 2
   on read iteration 2:
   1. compaction plan at `instant-0` has completed
   2. deltacommit at `instant-2` has completed
   
   Since the `issuedInstant=1`, InstantRange is as such:
   ```txt
   InstantRange[type=OPEN_CLOSE, start=1, end=2]
   ```
   
   The read consumer will read out newly inserted rows in deltacommit @ `instant-2`, at this point, the rows generated by the base parquet files at compaction `instant-0` will be ignored since the data file iterators will skip rows that have an `_hoodie_commit_instant` that lies outside the instantRange.
   
   ```
   0.compaction.requested
   0.compaction.inlifght
   0.commit
   
   1.deltacommit.requested
   1.deltacommit.inflight
   1.deltacommit
   
   2.deltacommit.requested
   2.deltacommit.inflight
   2.deltacommit
   ```
   
   ## Configuration description
   Circling back to the configuration description, when a consumer is reading faster than a compaction instant, the possibility of duplicated data being read out (due to them existing in the base parquet file and delta log file) is virtually 0. 
   
   `read.streaming.skip_compaction` SHOULD NOT be used to avoid duplicates if **consumer reads faster than any compaction instants**. Hence, explaining why i feel the original description is misleading.
   
   ## Proposed change
   That being said, my phrasing in the initial change is pretty misleading too.
   
   What I was trying to say is:
   
   The `read.streaming.skip_compaction` should only be enabled if `hoodie.compaction.preserve.commit.metadata` is modified to its non-default value of false IFF compaction plan completes before a deltacommit to be read in the next read iteration.
   
   Building upon the previous examples, suppose a read iteration 3 is performed with the following configurations:
   ```properties
   hoodie.compaction.preserve.commit.metadata=false (non-default value)
   read.streaming.skip_compaction=false (default value)
   ```
   
   ```txt
   3.compaction.requested
   3.compaction.inflight
   3.commit
   
   4.deltacommit.requested
   4.deltacommit.inflight
   4.deltacommit
   
   issuedInstant=2
   InstantRange[type=OPEN_CLOSE, start=2, end=4]
   ```
   
   At this point, the newly compacted rows (which have already been read during read iteration 2) in the base-parquet files generated in `instant-3` will have a `_hoodie_commit_time` of `3`. (commit metadata is not preserved, hence overwritten)
   
   This falls within the InstantRange in the current read iteration, causing the records that have been read in read iteration 2 to be read out again, causing duplicated data to be read out.
   
   As such, `read.streaming.skip_compaction` should be used to avoid reading duplicates under such a case when the user is definitely sure that the compaction instants are completing faster than the next deltacommit to be read in. 
   
   # Disclaimer
   I am ignoring changelog mode's description as I have yet to test this configuration under such a use case.
   
   We will also need to sync up the changes here with #6855 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org