You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/03 09:25:58 UTC
[GitHub] [hudi] yuzhaojing opened a new pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
yuzhaojing opened a new pull request #3029:
URL: https://github.com/apache/hudi/pull/3029
## *Tips*
- *Thank you very much for contributing to Apache Hudi.*
- *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
## What is the purpose of the pull request
*jira: https://issues.apache.org/jira/browse/HUDI-1954*
## Brief change log
*(for example:)*
- *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
## Verify this pull request
*(Please pick either of the following options)*
This pull request is a trivial rework / code cleanup without any test coverage.
*(or)*
This pull request is already covered by existing tests, such as *(please describe tests)*.
(or)
This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end.*
- *Added HoodieClientWriteTest to verify the change.*
- *Manually verified the change by running a job locally.*
## Committer checklist
- [ ] Has a corresponding JIRA in PR title & commit
- [ ] Commit message is descriptive of the change
- [ ] CI is green
- [ ] Necessary doc changes done or have another open PR
- [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624
# [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b3ead29) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **increase** coverage by `15.74%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #3029 +/- ##
=============================================
+ Coverage 55.13% 70.88% +15.74%
+ Complexity 3864 386 -3478
=============================================
Files 487 54 -433
Lines 23608 2016 -21592
Branches 2527 241 -2286
=============================================
- Hits 13016 1429 -11587
+ Misses 9435 454 -8981
+ Partials 1157 133 -1024
```
| Flag | Coverage Δ | |
|---|---|---|
| hudicli | `?` | |
| hudiclient | `?` | |
| hudicommon | `?` | |
| hudiflink | `?` | |
| hudihadoopmr | `?` | |
| hudisparkdatasource | `?` | |
| hudisync | `?` | |
| huditimelineservice | `?` | |
| hudiutilities | `70.88% <ø> (+0.04%)` | :arrow_up: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...rg/apache/hudi/common/model/HoodieRollingStat.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZVJvbGxpbmdTdGF0LmphdmE=) | | |
| [...e/hudi/common/table/timeline/dto/FileSliceDTO.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3RpbWVsaW5lL2R0by9GaWxlU2xpY2VEVE8uamF2YQ==) | | |
| [.../hudi/table/format/cow/CopyOnWriteInputFormat.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS90YWJsZS9mb3JtYXQvY293L0NvcHlPbldyaXRlSW5wdXRGb3JtYXQuamF2YQ==) | | |
| [.../apache/hudi/exception/HoodieCompactException.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhjZXB0aW9uL0hvb2RpZUNvbXBhY3RFeGNlcHRpb24uamF2YQ==) | | |
| [...udi/common/bootstrap/index/NoOpBootstrapIndex.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2Jvb3RzdHJhcC9pbmRleC9Ob09wQm9vdHN0cmFwSW5kZXguamF2YQ==) | | |
| [...n/java/org/apache/hudi/cli/commands/SparkMain.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL1NwYXJrTWFpbi5qYXZh) | | |
| [...rg/apache/hudi/common/model/HoodieFileGroupId.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZUZpbGVHcm91cElkLmphdmE=) | | |
| [.../org/apache/hudi/common/metrics/LocalRegistry.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21ldHJpY3MvTG9jYWxSZWdpc3RyeS5qYXZh) | | |
| [...che/hudi/common/table/timeline/dto/InstantDTO.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3RpbWVsaW5lL2R0by9JbnN0YW50RFRPLmphdmE=) | | |
| [...va/org/apache/hudi/cli/commands/CleansCommand.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL0NsZWFuc0NvbW1hbmQuamF2YQ==) | | |
| ... and [423 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645313144
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
Review comment:
Already logged in method `flushBucket`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645344681
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
Review comment:
Yes, this makes the message clearer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645335801
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
Review comment:
Fine, how about we change the message to `The buffer size hits the threshold {}, but still flush the max size data bucket failed`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624
# [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (66d5fed) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **decrease** coverage by `45.85%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #3029 +/- ##
============================================
- Coverage 55.13% 9.27% -45.86%
+ Complexity 3864 48 -3816
============================================
Files 487 54 -433
Lines 23608 2016 -21592
Branches 2527 241 -2286
============================================
- Hits 13016 187 -12829
+ Misses 9435 1816 -7619
+ Partials 1157 13 -1144
```
| Flag | Coverage Δ | |
|---|---|---|
| hudicli | `?` | |
| hudiclient | `?` | |
| hudicommon | `?` | |
| hudiflink | `?` | |
| hudihadoopmr | `?` | |
| hudisparkdatasource | `?` | |
| hudisync | `?` | |
| huditimelineservice | `?` | |
| hudiutilities | `9.27% <ø> (-61.56%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| ... and [463 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645315126
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
Review comment:
The log in method `flushBucket` only say flush failed, this log tell user will oom.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624
# [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (eb3cf90) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **increase** coverage by `15.69%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #3029 +/- ##
=============================================
+ Coverage 55.13% 70.83% +15.69%
+ Complexity 3864 385 -3479
=============================================
Files 487 54 -433
Lines 23608 2016 -21592
Branches 2527 241 -2286
=============================================
- Hits 13016 1428 -11588
+ Misses 9435 454 -8981
+ Partials 1157 134 -1023
```
| Flag | Coverage Δ | |
|---|---|---|
| hudicli | `?` | |
| hudiclient | `?` | |
| hudicommon | `?` | |
| hudiflink | `?` | |
| hudihadoopmr | `?` | |
| hudisparkdatasource | `?` | |
| hudisync | `?` | |
| huditimelineservice | `?` | |
| hudiutilities | `70.83% <ø> (ø)` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...able/timeline/versioning/AbstractMigratorBase.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3RpbWVsaW5lL3ZlcnNpb25pbmcvQWJzdHJhY3RNaWdyYXRvckJhc2UuamF2YQ==) | | |
| [...i/common/util/collection/ExternalSpillableMap.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvY29sbGVjdGlvbi9FeHRlcm5hbFNwaWxsYWJsZU1hcC5qYXZh) | | |
| [...i/common/table/view/HoodieTableFileSystemView.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL3ZpZXcvSG9vZGllVGFibGVGaWxlU3lzdGVtVmlldy5qYXZh) | | |
| [...til/jvm/HotSpotMemoryLayoutSpecification64bit.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvanZtL0hvdFNwb3RNZW1vcnlMYXlvdXRTcGVjaWZpY2F0aW9uNjRiaXQuamF2YQ==) | | |
| [.../common/util/queue/IteratorBasedQueueProducer.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvcXVldWUvSXRlcmF0b3JCYXNlZFF1ZXVlUHJvZHVjZXIuamF2YQ==) | | |
| [...java/org/apache/hudi/table/format/FormatUtils.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS90YWJsZS9mb3JtYXQvRm9ybWF0VXRpbHMuamF2YQ==) | | |
| [...a/org/apache/hudi/avro/HoodieAvroWriteSupport.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvYXZyby9Ib29kaWVBdnJvV3JpdGVTdXBwb3J0LmphdmE=) | | |
| [.../org/apache/hudi/common/model/HoodieWriteStat.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZVdyaXRlU3RhdC5qYXZh) | | |
| [...he/hudi/table/format/cow/AbstractColumnReader.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS90YWJsZS9mb3JtYXQvY293L0Fic3RyYWN0Q29sdW1uUmVhZGVyLmphdmE=) | | |
| [...udi/common/util/queue/BoundedInMemoryExecutor.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvcXVldWUvQm91bmRlZEluTWVtb3J5RXhlY3V0b3IuamF2YQ==) | | |
| ... and [422 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645243751
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ boolean flush = flushBucket(bucket);
+ if (flush) {
+ this.tracer.countDown(bucket.detector.totalSize);
Review comment:
fix
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ boolean flush = flushBucket(bucket);
+ if (flush) {
+ this.tracer.countDown(bucket.detector.totalSize);
Review comment:
ok
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
Review comment:
The log in method `flushBucket` only say flush failed, this log tell user will oom.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
}
bucket.records.add(item);
}
@SuppressWarnings("unchecked, rawtypes")
- private void flushBucket(DataBucket bucket) {
+ private boolean flushBucket(DataBucket bucket) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
- LOG.info("No inflight instant when flushing data, cancel.");
- return;
Review comment:
I agree it.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
Review comment:
Yes, this makes the message clearer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624
# [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b3ead29) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **decrease** coverage by `45.85%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #3029 +/- ##
============================================
- Coverage 55.13% 9.27% -45.86%
+ Complexity 3864 48 -3816
============================================
Files 487 54 -433
Lines 23608 2016 -21592
Branches 2527 241 -2286
============================================
- Hits 13016 187 -12829
+ Misses 9435 1816 -7619
+ Partials 1157 13 -1144
```
| Flag | Coverage Δ | |
|---|---|---|
| hudicli | `?` | |
| hudiclient | `?` | |
| hudicommon | `?` | |
| hudiflink | `?` | |
| hudihadoopmr | `?` | |
| hudisparkdatasource | `?` | |
| hudisync | `?` | |
| huditimelineservice | `?` | |
| hudiutilities | `9.27% <ø> (-61.56%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| ... and [463 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-commenter commented on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624
# [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (eb3cf90) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **decrease** coverage by `45.85%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #3029 +/- ##
============================================
- Coverage 55.13% 9.27% -45.86%
+ Complexity 3864 48 -3816
============================================
Files 487 54 -433
Lines 23608 2016 -21592
Branches 2527 241 -2286
============================================
- Hits 13016 187 -12829
+ Misses 9435 1816 -7619
+ Partials 1157 13 -1144
```
| Flag | Coverage Δ | |
|---|---|---|
| hudicli | `?` | |
| hudiclient | `?` | |
| hudicommon | `?` | |
| hudiflink | `?` | |
| hudihadoopmr | `?` | |
| hudisparkdatasource | `?` | |
| hudisync | `?` | |
| huditimelineservice | `?` | |
| hudiutilities | `9.27% <ø> (-61.56%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| ... and [463 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645336213
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
}
bucket.records.add(item);
}
@SuppressWarnings("unchecked, rawtypes")
- private void flushBucket(DataBucket bucket) {
+ private boolean flushBucket(DataBucket bucket) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
- LOG.info("No inflight instant when flushing data, cancel.");
- return;
Review comment:
Actually, the current code base should never match this condition now ~ The coordinator would the last instant if there are no write metadata to commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-commenter commented on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624
# [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (eb3cf90) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **decrease** coverage by `45.85%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #3029 +/- ##
============================================
- Coverage 55.13% 9.27% -45.86%
+ Complexity 3864 48 -3816
============================================
Files 487 54 -433
Lines 23608 2016 -21592
Branches 2527 241 -2286
============================================
- Hits 13016 187 -12829
+ Misses 9435 1816 -7619
+ Partials 1157 13 -1144
```
| Flag | Coverage Δ | |
|---|---|---|
| hudicli | `?` | |
| hudiclient | `?` | |
| hudicommon | `?` | |
| hudiflink | `?` | |
| hudihadoopmr | `?` | |
| hudisparkdatasource | `?` | |
| hudisync | `?` | |
| huditimelineservice | `?` | |
| hudiutilities | `9.27% <ø> (-61.56%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlcldpdGhQb3N0UHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| ... and [463 more](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645277018
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ boolean flush = flushBucket(bucket);
+ if (flush) {
+ this.tracer.countDown(bucket.detector.totalSize);
Review comment:
Can we just inline this flag, make it more concise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] garyli1019 merged pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
garyli1019 merged pull request #3029:
URL: https://github.com/apache/hudi/pull/3029
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] codecov-commenter edited a comment on pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#issuecomment-853731624
# [Codecov](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#3029](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5cf821b) into [master](https://codecov.io/gh/apache/hudi/commit/86007e9a13341e1181f940c5f6e5f6dba8eed755?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (86007e9) will **decrease** coverage by `0.01%`.
> The diff coverage is `45.45%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/3029/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #3029 +/- ##
============================================
- Coverage 55.13% 55.12% -0.02%
- Complexity 3864 3866 +2
============================================
Files 487 487
Lines 23608 23613 +5
Branches 2527 2530 +3
============================================
Hits 13016 13016
- Misses 9435 9439 +4
- Partials 1157 1158 +1
```
| Flag | Coverage Δ | |
|---|---|---|
| hudicli | `39.55% <ø> (ø)` | |
| hudiclient | `∅ <ø> (∅)` | |
| hudicommon | `50.31% <ø> (-0.01%)` | :arrow_down: |
| hudiflink | `63.25% <45.45%> (-0.09%)` | :arrow_down: |
| hudihadoopmr | `51.54% <ø> (ø)` | |
| hudisparkdatasource | `74.28% <ø> (ø)` | |
| hudisync | `46.44% <ø> (ø)` | |
| huditimelineservice | `64.36% <ø> (ø)` | |
| hudiutilities | `70.88% <ø> (+0.04%)` | :arrow_up: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/hudi/pull/3029?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...java/org/apache/hudi/sink/StreamWriteFunction.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zaW5rL1N0cmVhbVdyaXRlRnVuY3Rpb24uamF2YQ==) | `84.84% <45.45%> (-1.02%)` | :arrow_down: |
| [...ache/hudi/common/fs/inline/InMemoryFileSystem.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9Jbk1lbW9yeUZpbGVTeXN0ZW0uamF2YQ==) | `79.31% <0.00%> (-10.35%)` | :arrow_down: |
| [...va/org/apache/hudi/table/format/FilePathUtils.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS90YWJsZS9mb3JtYXQvRmlsZVBhdGhVdGlscy5qYXZh) | `66.91% <0.00%> (-1.03%)` | :arrow_down: |
| [.../hudi/table/format/mor/MergeOnReadInputFormat.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS90YWJsZS9mb3JtYXQvbW9yL01lcmdlT25SZWFkSW5wdXRGb3JtYXQuamF2YQ==) | `64.75% <0.00%> (-0.40%)` | :arrow_down: |
| [...ava/org/apache/hudi/exception/HoodieException.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhjZXB0aW9uL0hvb2RpZUV4Y2VwdGlvbi5qYXZh) | `50.00% <0.00%> (ø)` | |
| [...va/org/apache/hudi/configuration/FlinkOptions.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9jb25maWd1cmF0aW9uL0ZsaW5rT3B0aW9ucy5qYXZh) | `91.66% <0.00%> (+0.26%)` | :arrow_up: |
| [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `71.18% <0.00%> (+0.33%)` | :arrow_up: |
| [...e/hudi/common/table/log/HoodieLogFormatWriter.java](https://codecov.io/gh/apache/hudi/pull/3029/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9Ib29kaWVMb2dGb3JtYXRXcml0ZXIuamF2YQ==) | `79.68% <0.00%> (+1.56%)` | :arrow_up: |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645336456
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
}
bucket.records.add(item);
}
@SuppressWarnings("unchecked, rawtypes")
- private void flushBucket(DataBucket bucket) {
+ private boolean flushBucket(DataBucket bucket) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
- LOG.info("No inflight instant when flushing data, cancel.");
- return;
Review comment:
But we can still keep it as a protection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645243751
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ boolean flush = flushBucket(bucket);
+ if (flush) {
+ this.tracer.countDown(bucket.detector.totalSize);
Review comment:
fix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645304027
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ boolean flush = flushBucket(bucket);
+ if (flush) {
+ this.tracer.countDown(bucket.detector.totalSize);
Review comment:
ok
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] yuzhaojing commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
yuzhaojing commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645344295
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
}
bucket.records.add(item);
}
@SuppressWarnings("unchecked, rawtypes")
- private void flushBucket(DataBucket bucket) {
+ private boolean flushBucket(DataBucket bucket) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
- LOG.info("No inflight instant when flushing data, cancel.");
- return;
Review comment:
I agree it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645239288
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ boolean flush = flushBucket(bucket);
+ if (flush) {
+ this.tracer.countDown(bucket.detector.totalSize);
Review comment:
flush => flushed
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ boolean flush = flushBucket(bucket);
+ if (flush) {
+ this.tracer.countDown(bucket.detector.totalSize);
Review comment:
Can we just inline this flag, make it more concise.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
Review comment:
Already logged in method `flushBucket`.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
Review comment:
Fine, how about we change the message to `The buffer size hits the threshold {}, but still flush the max size data bucket failed`
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
}
bucket.records.add(item);
}
@SuppressWarnings("unchecked, rawtypes")
- private void flushBucket(DataBucket bucket) {
+ private boolean flushBucket(DataBucket bucket) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
- LOG.info("No inflight instant when flushing data, cancel.");
- return;
Review comment:
Actually, the current code base should never match this condition now ~ The coordinator would the last instant if there are no write metadata to commit.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,34 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("BufferSize size is larger than maxBufferSize, but flushBucket still failed.");
+ }
}
bucket.records.add(item);
}
@SuppressWarnings("unchecked, rawtypes")
- private void flushBucket(DataBucket bucket) {
+ private boolean flushBucket(DataBucket bucket) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
- LOG.info("No inflight instant when flushing data, cancel.");
- return;
Review comment:
But we can still keep it as a protection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #3029: [HUDI-1954] Only reset bucket when flush bucket success
Posted by GitBox <gi...@apache.org>.
danny0405 commented on a change in pull request #3029:
URL: https://github.com/apache/hudi/pull/3029#discussion_r645239288
##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
##########
@@ -470,30 +470,36 @@ private void bufferRecord(HoodieRecord<?> value) {
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ boolean flush = flushBucket(bucket);
+ if (flush) {
+ this.tracer.countDown(bucket.detector.totalSize);
Review comment:
flush => flushed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org