You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/30 16:58:47 UTC

[GitHub] [hudi] qjqqyy opened a new pull request, #5718: [HUDI-4006] failOnDataLoss on delta-streamer kafka sources

qjqqyy opened a new pull request, #5718:
URL: https://github.com/apache/hudi/pull/5718

   ## What is the purpose of the pull request
   
   new feature: fail-fast when kafka offset goes OOB (see #5400)
   
   ## Brief change log
   
   - add new config key `hoodie.deltastreamer.source.kafka.enable.failOnDataLoss`
   - when failOnDataLoss=false (current behaviour, the default), log a warning instead of seeking to earliest silently
   - when failOnDataLoss is set, fail explicitly
   
   ## Verify this pull request
   
   This change added tests and can be verified as follows:
   
     - Added a unit test for the newly introduced behaviour
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5718: [HUDI-4006] failOnDataLoss on delta-streamer kafka sources

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5718:
URL: https://github.com/apache/hudi/pull/5718#issuecomment-1141358767

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "80d485ee8b44c3dca545589043a363520423da7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9010",
       "triggerID" : "80d485ee8b44c3dca545589043a363520423da7a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80d485ee8b44c3dca545589043a363520423da7a Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9010) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] qjqqyy commented on a diff in pull request #5718: [HUDI-4006] failOnDataLoss on delta-streamer kafka sources

Posted by GitBox <gi...@apache.org>.
qjqqyy commented on code in PR #5718:
URL: https://github.com/apache/hudi/pull/5718#discussion_r894988764


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -329,9 +334,19 @@ private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
                                                         Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
     Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
     Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
-    boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
+    boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
         .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
-    return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
+    if (isCheckpointOutOfBounds) {
+      if (this.props.getBoolean(Config.ENABLE_FAIL_ON_DATA_LOSS.key(), Config.ENABLE_FAIL_ON_DATA_LOSS.defaultValue())) {

Review Comment:
   props is a `TypedProperties`, can only getBooleanOrDefault on a `HoodieConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5718: [HUDI-4006] failOnDataLoss on delta-streamer kafka sources

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5718:
URL: https://github.com/apache/hudi/pull/5718#issuecomment-1141421442

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "80d485ee8b44c3dca545589043a363520423da7a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9010",
       "triggerID" : "80d485ee8b44c3dca545589043a363520423da7a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80d485ee8b44c3dca545589043a363520423da7a Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=9010) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5718: [HUDI-4006] failOnDataLoss on delta-streamer kafka sources

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5718:
URL: https://github.com/apache/hudi/pull/5718#issuecomment-1141356217

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "80d485ee8b44c3dca545589043a363520423da7a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80d485ee8b44c3dca545589043a363520423da7a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80d485ee8b44c3dca545589043a363520423da7a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #5718: [HUDI-4006] failOnDataLoss on delta-streamer kafka sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on code in PR #5718:
URL: https://github.com/apache/hudi/pull/5718#discussion_r893980927


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -329,9 +334,19 @@ private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
                                                         Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
     Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
     Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
-    boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
+    boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
         .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
-    return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
+    if (isCheckpointOutOfBounds) {
+      if (this.props.getBoolean(Config.ENABLE_FAIL_ON_DATA_LOSS.key(), Config.ENABLE_FAIL_ON_DATA_LOSS.defaultValue())) {

Review Comment:
   getBooleanOrDefault please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org