You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/03 09:25:58 UTC

[GitHub] [hudi] yuzhaojing opened a new pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

yuzhaojing opened a new pull request #3029:
URL: https://github.com/apache/hudi/pull/3029


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   *jira: https://issues.apache.org/jira/browse/HUDI-1954*
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b3ead29) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **increase** coverage by `15.74%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #3029       +/-   ##
   =============================================
   + Coverage     55.13%   70.88%   +15.74%     
   + Complexity     3864      386     -3478     
   =============================================
     Files           487       54      -433     
     Lines         23608     2016    -21592     
     Branches       2527      241     -2286     
   =============================================
   - Hits          13016     1429    -11587     
   + Misses         9435      454     -8981     
   + Partials       1157      133     -1024     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | hudicli | `?` | |
   | hudiclient | `?` | |
   | hudicommon | `?` | |
   | hudiflink | `?` | |
   | hudihadoopmr | `?` | |
   | hudisparkdatasource | `?` | |
   | hudisync | `?` | |
   | huditimelineservice | `?` | |
   | hudiutilities | `70.88% <ø> (+0.04%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...rg/apache/hudi/common/model/HoodieRollingStat.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZVJvbGxpbmdTdGF0LmphdmE=) | | |
   | [...e/hudi/common/table/timeline/dto/FileSliceDTO.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3RpbWVsaW5lL2R0by9GaWxlU2xpY2VEVE8uamF2YQ==) | | |
   | [.../hudi/table/format/cow/CopyOnWriteInputFormat.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS90YWJsZS9mb3JtYXQvY293L0NvcHlPbldyaXRlSW5wdXRGb3JtYXQuamF2YQ==) | | |
   | [.../apache/hudi/exception/HoodieCompactException.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhjZXB0aW9uL0hvb2RpZUNvbXBhY3RFeGNlcHRpb24uamF2YQ==) | | |
   | [...udi/common/bootstrap/index/NoOpBootstrapIndex.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2Jvb3RzdHJhcC9pbmRleC9Ob09wQm9vdHN0cmFwSW5kZXguamF2YQ==) | | |
   | [...n/java/org/apache/hudi/cli/commands/SparkMain.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL1NwYXJrTWFpbi5qYXZh) | | |
   | [...rg/apache/hudi/common/model/HoodieFileGroupId.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZUZpbGVHcm91cElkLmphdmE=) | | |
   | [.../org/apache/hudi/common/metrics/LocalRegistry.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21ldHJpY3MvTG9jYWxSZWdpc3RyeS5qYXZh) | | |
   | [...che/hudi/common/table/timeline/dto/InstantDTO.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3RpbWVsaW5lL2R0by9JbnN0YW50RFRPLmphdmE=) | | |
   | [...va/org/apache/hudi/cli/commands/CleansCommand.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL0NsZWFuc0NvbW1hbmQuamF2YQ==) | | |
   | ... and [423 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645313144



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }

Review comment:
       Already logged in method `flushBucket`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645344681



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }

Review comment:
       Yes, this makes the message clearer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645335801



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }

Review comment:
       Fine, how about we change the message to `The buffer size hits the threshold {}, but still flush the max size data bucket failed`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (66d5fed) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **decrease** coverage by `45.85%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #3029       +/-   ##
   ============================================
   - Coverage     55.13%   9.27%   -45.86%     
   + Complexity     3864      48     -3816     
   ============================================
     Files           487      54      -433     
     Lines         23608    2016    -21592     
     Branches       2527     241     -2286     
   ============================================
   - Hits          13016     187    -12829     
   + Misses         9435    1816     -7619     
   + Partials       1157      13     -1144     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | hudicli | `?` | |
   | hudiclient | `?` | |
   | hudicommon | `?` | |
   | hudiflink | `?` | |
   | hudihadoopmr | `?` | |
   | hudisparkdatasource | `?` | |
   | hudisync | `?` | |
   | huditimelineservice | `?` | |
   | hudiutilities | `9.27% <ø> (-61.56%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [463 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645315126



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }

Review comment:
       The log in method `flushBucket` only say flush failed, this log tell user will oom.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (eb3cf90) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **increase** coverage by `15.69%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #3029       +/-   ##
   =============================================
   + Coverage     55.13%   70.83%   +15.69%     
   + Complexity     3864      385     -3479     
   =============================================
     Files           487       54      -433     
     Lines         23608     2016    -21592     
     Branches       2527      241     -2286     
   =============================================
   - Hits          13016     1428    -11588     
   + Misses         9435      454     -8981     
   + Partials       1157      134     -1023     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | hudicli | `?` | |
   | hudiclient | `?` | |
   | hudicommon | `?` | |
   | hudiflink | `?` | |
   | hudihadoopmr | `?` | |
   | hudisparkdatasource | `?` | |
   | hudisync | `?` | |
   | huditimelineservice | `?` | |
   | hudiutilities | `70.83% <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...able/timeline/versioning/AbstractMigratorBase.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3RpbWVsaW5lL3ZlcnNpb25pbmcvQWJzdHJhY3RNaWdyYXRvckJhc2UuamF2YQ==) | | |
   | [...i/common/util/collection/ExternalSpillableMap.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvY29sbGVjdGlvbi9FeHRlcm5hbFNwaWxsYWJsZU1hcC5qYXZh) | | |
   | [...i/common/table/view/HoodieTableFileSystemView.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3ZpZXcvSG9vZGllVGFibGVGaWxlU3lzdGVtVmlldy5qYXZh) | | |
   | [...til/jvm/HotSpotMemoryLayoutSpecification64bit.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvanZtL0hvdFNwb3RNZW1vcnlMYXlvdXRTcGVjaWZpY2F0aW9uNjRiaXQuamF2YQ==) | | |
   | [.../common/util/queue/IteratorBasedQueueProducer.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvcXVldWUvSXRlcmF0b3JCYXNlZFF1ZXVlUHJvZHVjZXIuamF2YQ==) | | |
   | [...java/org/apache/hudi/table/format/FormatUtils.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS90YWJsZS9mb3JtYXQvRm9ybWF0VXRpbHMuamF2YQ==) | | |
   | [...a/org/apache/hudi/avro/HoodieAvroWriteSupport.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvV3JpdGVTdXBwb3J0LmphdmE=) | | |
   | [.../org/apache/hudi/common/model/HoodieWriteStat.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZVdyaXRlU3RhdC5qYXZh) | | |
   | [...he/hudi/table/format/cow/AbstractColumnReader.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS90YWJsZS9mb3JtYXQvY293L0Fic3RyYWN0Q29sdW1uUmVhZGVyLmphdmE=) | | |
   | [...udi/common/util/queue/BoundedInMemoryExecutor.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvcXVldWUvQm91bmRlZEluTWVtb3J5RXhlY3V0b3IuamF2YQ==) | | |
   | ... and [422 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645243751



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      boolean flush = flushBucket(bucket);
+      if (flush) {
+        this.tracer.countDown(bucket.detector.totalSize);

Review comment:
       fix

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      boolean flush = flushBucket(bucket);
+      if (flush) {
+        this.tracer.countDown(bucket.detector.totalSize);

Review comment:
       ok

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }

Review comment:
       The log in method `flushBucket` only say flush failed, this log tell user will oom.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }
     }
     bucket.records.add(item);
   }
 
   @SuppressWarnings("unchecked, rawtypes")
-  private void flushBucket(DataBucket bucket) {
+  private boolean flushBucket(DataBucket bucket) {
     String instant = this.writeClient.getLastPendingInstant(this.actionType);
 
     if (instant == null) {
       // in case there are empty checkpoints that has no input data
-      LOG.info("No inflight instant when flushing data, cancel.");
-      return;

Review comment:
       I agree it.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }

Review comment:
       Yes, this makes the message clearer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b3ead29) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **decrease** coverage by `45.85%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #3029       +/-   ##
   ============================================
   - Coverage     55.13%   9.27%   -45.86%     
   + Complexity     3864      48     -3816     
   ============================================
     Files           487      54      -433     
     Lines         23608    2016    -21592     
     Branches       2527     241     -2286     
   ============================================
   - Hits          13016     187    -12829     
   + Misses         9435    1816     -7619     
   + Partials       1157      13     -1144     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | hudicli | `?` | |
   | hudiclient | `?` | |
   | hudicommon | `?` | |
   | hudiflink | `?` | |
   | hudihadoopmr | `?` | |
   | hudisparkdatasource | `?` | |
   | hudisync | `?` | |
   | huditimelineservice | `?` | |
   | hudiutilities | `9.27% <ø> (-61.56%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [463 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter commented on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (eb3cf90) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **decrease** coverage by `45.85%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #3029       +/-   ##
   ============================================
   - Coverage     55.13%   9.27%   -45.86%     
   + Complexity     3864      48     -3816     
   ============================================
     Files           487      54      -433     
     Lines         23608    2016    -21592     
     Branches       2527     241     -2286     
   ============================================
   - Hits          13016     187    -12829     
   + Misses         9435    1816     -7619     
   + Partials       1157      13     -1144     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | hudicli | `?` | |
   | hudiclient | `?` | |
   | hudicommon | `?` | |
   | hudiflink | `?` | |
   | hudihadoopmr | `?` | |
   | hudisparkdatasource | `?` | |
   | hudisync | `?` | |
   | huditimelineservice | `?` | |
   | hudiutilities | `9.27% <ø> (-61.56%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [463 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645336213



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }
     }
     bucket.records.add(item);
   }
 
   @SuppressWarnings("unchecked, rawtypes")
-  private void flushBucket(DataBucket bucket) {
+  private boolean flushBucket(DataBucket bucket) {
     String instant = this.writeClient.getLastPendingInstant(this.actionType);
 
     if (instant == null) {
       // in case there are empty checkpoints that has no input data
-      LOG.info("No inflight instant when flushing data, cancel.");
-      return;

Review comment:
       Actually, the current code base should never match this condition now ~ The coordinator would the last instant if there are no write metadata to commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter commented on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (eb3cf90) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **decrease** coverage by `45.85%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #3029       +/-   ##
   ============================================
   - Coverage     55.13%   9.27%   -45.86%     
   + Complexity     3864      48     -3816     
   ============================================
     Files           487      54      -433     
     Lines         23608    2016    -21592     
     Branches       2527     241     -2286     
   ============================================
   - Hits          13016     187    -12829     
   + Misses         9435    1816     -7619     
   + Partials       1157      13     -1144     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | hudicli | `?` | |
   | hudiclient | `?` | |
   | hudicommon | `?` | |
   | hudiflink | `?` | |
   | hudihadoopmr | `?` | |
   | hudisparkdatasource | `?` | |
   | hudisync | `?` | |
   | huditimelineservice | `?` | |
   | hudiutilities | `9.27% <ø> (-61.56%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [463 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645277018



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      boolean flush = flushBucket(bucket);
+      if (flush) {
+        this.tracer.countDown(bucket.detector.totalSize);

Review comment:
       Can we just inline this flag, make it more concise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 merged pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
garyli1019 merged pull request #3029:
URL: https://github.com/apache/hudi/pull/3029


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5cf821b) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **decrease** coverage by `0.01%`.
   > The diff coverage is `45.45%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3029      +/-   ##
   ============================================
   - Coverage     55.13%   55.12%   -0.02%     
   - Complexity     3864     3866       +2     
   ============================================
     Files           487      487              
     Lines         23608    23613       +5     
     Branches       2527     2530       +3     
   ============================================
     Hits          13016    13016              
   - Misses         9435     9439       +4     
   - Partials       1157     1158       +1     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | hudicli | `39.55% <ø> (ø)` | |
   | hudiclient | `∅ <ø> (∅)` | |
   | hudicommon | `50.31% <ø> (-0.01%)` | :arrow_down: |
   | hudiflink | `63.25% <45.45%> (-0.09%)` | :arrow_down: |
   | hudihadoopmr | `51.54% <ø> (ø)` | |
   | hudisparkdatasource | `74.28% <ø> (ø)` | |
   | hudisync | `46.44% <ø> (ø)` | |
   | huditimelineservice | `64.36% <ø> (ø)` | |
   | hudiutilities | `70.88% <ø> (+0.04%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...java/org/apache/hudi/sink/StreamWriteFunction.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zaW5rL1N0cmVhbVdyaXRlRnVuY3Rpb24uamF2YQ==) | `84.84% <45.45%> (-1.02%)` | :arrow_down: |
   | [...ache/hudi/common/fs/inline/InMemoryFileSystem.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9Jbk1lbW9yeUZpbGVTeXN0ZW0uamF2YQ==) | `79.31% <0.00%> (-10.35%)` | :arrow_down: |
   | [...va/org/apache/hudi/table/format/FilePathUtils.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS90YWJsZS9mb3JtYXQvRmlsZVBhdGhVdGlscy5qYXZh) | `66.91% <0.00%> (-1.03%)` | :arrow_down: |
   | [.../hudi/table/format/mor/MergeOnReadInputFormat.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS90YWJsZS9mb3JtYXQvbW9yL01lcmdlT25SZWFkSW5wdXRGb3JtYXQuamF2YQ==) | `64.75% <0.00%> (-0.40%)` | :arrow_down: |
   | [...ava/org/apache/hudi/exception/HoodieException.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhjZXB0aW9uL0hvb2RpZUV4Y2VwdGlvbi5qYXZh) | `50.00% <0.00%> (ø)` | |
   | [...va/org/apache/hudi/configuration/FlinkOptions.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9jb25maWd1cmF0aW9uL0ZsaW5rT3B0aW9ucy5qYXZh) | `91.66% <0.00%> (+0.26%)` | :arrow_up: |
   | [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `71.18% <0.00%> (+0.33%)` | :arrow_up: |
   | [...e/hudi/common/table/log/HoodieLogFormatWriter.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9Ib29kaWVMb2dGb3JtYXRXcml0ZXIuamF2YQ==) | `79.68% <0.00%> (+1.56%)` | :arrow_up: |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645336456



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }
     }
     bucket.records.add(item);
   }
 
   @SuppressWarnings("unchecked, rawtypes")
-  private void flushBucket(DataBucket bucket) {
+  private boolean flushBucket(DataBucket bucket) {
     String instant = this.writeClient.getLastPendingInstant(this.actionType);
 
     if (instant == null) {
       // in case there are empty checkpoints that has no input data
-      LOG.info("No inflight instant when flushing data, cancel.");
-      return;

Review comment:
       But we can still keep it as a protection.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645243751



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      boolean flush = flushBucket(bucket);
+      if (flush) {
+        this.tracer.countDown(bucket.detector.totalSize);

Review comment:
       fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645304027



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      boolean flush = flushBucket(bucket);
+      if (flush) {
+        this.tracer.countDown(bucket.detector.totalSize);

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645344295



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }
     }
     bucket.records.add(item);
   }
 
   @SuppressWarnings("unchecked, rawtypes")
-  private void flushBucket(DataBucket bucket) {
+  private boolean flushBucket(DataBucket bucket) {
     String instant = this.writeClient.getLastPendingInstant(this.actionType);
 
     if (instant == null) {
       // in case there are empty checkpoints that has no input data
-      LOG.info("No inflight instant when flushing data, cancel.");
-      return;

Review comment:
       I agree it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645239288



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      boolean flush = flushBucket(bucket);
+      if (flush) {
+        this.tracer.countDown(bucket.detector.totalSize);

Review comment:
       flush => flushed

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      boolean flush = flushBucket(bucket);
+      if (flush) {
+        this.tracer.countDown(bucket.detector.totalSize);

Review comment:
       Can we just inline this flag, make it more concise.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }

Review comment:
       Already logged in method `flushBucket`.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }

Review comment:
       Fine, how about we change the message to `The buffer size hits the threshold {}, but still flush the max size data bucket failed`

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }
     }
     bucket.records.add(item);
   }
 
   @SuppressWarnings("unchecked, rawtypes")
-  private void flushBucket(DataBucket bucket) {
+  private boolean flushBucket(DataBucket bucket) {
     String instant = this.writeClient.getLastPendingInstant(this.actionType);
 
     if (instant == null) {
       // in case there are empty checkpoints that has no input data
-      LOG.info("No inflight instant when flushing data, cancel.");
-      return;

Review comment:
       Actually, the current code base should never match this condition now ~ The coordinator would the last instant if there are no write metadata to commit.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+      }
     }
     bucket.records.add(item);
   }
 
   @SuppressWarnings("unchecked, rawtypes")
-  private void flushBucket(DataBucket bucket) {
+  private boolean flushBucket(DataBucket bucket) {
     String instant = this.writeClient.getLastPendingInstant(this.actionType);
 
     if (instant == null) {
       // in case there are empty checkpoints that has no input data
-      LOG.info("No inflight instant when flushing data, cancel.");
-      return;

Review comment:
       But we can still keep it as a protection.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success

Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645239288



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      boolean flush = flushBucket(bucket);
+      if (flush) {
+        this.tracer.countDown(bucket.detector.totalSize);

Review comment:
       flush => flushed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org