You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/19 13:55:42 UTC

[GitHub] [hudi] wxplovecc opened a new pull request, #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

wxplovecc opened a new pull request, #5362:
URL: https://github.com/apache/hudi/pull/5362

   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contribute/how-to-contribute before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   Improve flink bulk_insert perform for partitioned table 
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5362:
URL: https://github.com/apache/hudi/pull/5362#issuecomment-1102693876

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fabdbf1c87824b1d39aa211f3eaea867ff96a2cc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5362:
URL: https://github.com/apache/hudi/pull/5362#issuecomment-1102697821

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8142",
       "triggerID" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fabdbf1c87824b1d39aa211f3eaea867ff96a2cc Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8142) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] wxplovecc commented on a diff in pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

Posted by GitBox <gi...@apache.org>.
wxplovecc commented on code in PR #5362:
URL: https://github.com/apache/hudi/pull/5362#discussion_r853851616


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java:
##########
@@ -102,9 +102,15 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
       InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
 
       Map<String, String> bucketIdToFileId = new HashMap<>();
-      dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey)
-          .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
-          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
+      if (OptionsResolver.isPartitionedTable(conf)) {
+        dataStream = dataStream.keyBy(keyGen::getPartitionPath)
+                .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
+                .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
+      } else {
+        dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey)
+                .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)

Review Comment:
   > Why shuffle the input with partition path first ?
   
   - Shuffle with custom partitioner was not friendly to partition table with less bucket (eg. partition by day and 1 bucket),It only use 1 write task
   - Make sure the same partition processed in same task avoid duplicated fileId



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] wxplovecc commented on pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

Posted by GitBox <gi...@apache.org>.
wxplovecc commented on PR #5362:
URL: https://github.com/apache/hudi/pull/5362#issuecomment-1134358918

   Close this since https://github.com/apache/hudi/pull/5590 has fix it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5362:
URL: https://github.com/apache/hudi/pull/5362#issuecomment-1103465728

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8142",
       "triggerID" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4426590f9cb6609b41fbb923e674b573fd7f79ad",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8158",
       "triggerID" : "4426590f9cb6609b41fbb923e674b573fd7f79ad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fabdbf1c87824b1d39aa211f3eaea867ff96a2cc Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8142) 
   * 4426590f9cb6609b41fbb923e674b573fd7f79ad Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8158) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5362:
URL: https://github.com/apache/hudi/pull/5362#issuecomment-1103506131

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8142",
       "triggerID" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4426590f9cb6609b41fbb923e674b573fd7f79ad",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8158",
       "triggerID" : "4426590f9cb6609b41fbb923e674b573fd7f79ad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4426590f9cb6609b41fbb923e674b573fd7f79ad Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8158) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] wxplovecc closed pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

Posted by GitBox <gi...@apache.org>.
wxplovecc closed pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table
URL: https://github.com/apache/hudi/pull/5362


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5362:
URL: https://github.com/apache/hudi/pull/5362#issuecomment-1103464363

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8142",
       "triggerID" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4426590f9cb6609b41fbb923e674b573fd7f79ad",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4426590f9cb6609b41fbb923e674b573fd7f79ad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fabdbf1c87824b1d39aa211f3eaea867ff96a2cc Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8142) 
   * 4426590f9cb6609b41fbb923e674b573fd7f79ad UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

Posted by GitBox <gi...@apache.org>.
danny0405 commented on code in PR #5362:
URL: https://github.com/apache/hudi/pull/5362#discussion_r853796060


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java:
##########
@@ -102,9 +102,15 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
       InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
 
       Map<String, String> bucketIdToFileId = new HashMap<>();
-      dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey)
-          .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
-          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
+      if (OptionsResolver.isPartitionedTable(conf)) {
+        dataStream = dataStream.keyBy(keyGen::getPartitionPath)
+                .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
+                .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
+      } else {
+        dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey)
+                .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)

Review Comment:
   Why shuffle the input with partition path first ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #5362: [HUDI-3918] Improve flink bulk_insert perform for partitioned table

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #5362:
URL: https://github.com/apache/hudi/pull/5362#issuecomment-1102780595

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8142",
       "triggerID" : "fabdbf1c87824b1d39aa211f3eaea867ff96a2cc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fabdbf1c87824b1d39aa211f3eaea867ff96a2cc Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=8142) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org