You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "chenfengLiu (Jira)" <ji...@apache.org> on 2022/06/28 12:00:00 UTC

[jira] [Created] (HUDI-4337) Data skew in flink write stream

|  ![](cid:jira-generated-image-avatar-54225a9a-4e69-4f2d-90ff-4e6a8d3d5b93) |
[chenfengLiu](https://issues.apache.org/jira/secure/ViewProfile.jspa?name=liufangqi)
**created** an issue  
---|---  
|  
---  
|  [Apache Hudi](https://issues.apache.org/jira/browse/HUDI) /
[![Improvement](cid:jira-generated-image-
avatar-07c52728-b58c-48b9-9082-2043728a1663)](https://issues.apache.org/jira/browse/HUDI-4337)
[HUDI-4337](https://issues.apache.org/jira/browse/HUDI-4337)  
---  
[Data skew in flink write
stream](https://issues.apache.org/jira/browse/HUDI-4337)  
| Issue Type: |  ![Improvement](cid:jira-generated-image-
avatar-07c52728-b58c-48b9-9082-2043728a1663) Improvement  
---|---  
Assignee: |  Unassigned  
Attachments: |  image-2022-06-28-18-40-04-298.png,
image-2022-06-28-18-40-39-455.png  
Components: |  flink  
Created: |  28/Jun/22 11:59  
Fix Versions: |  0.11.0  
Labels: |  flink  
Priority: |  ![Major](cid:jira-generated-image-static-
major-861c2d03-ce4c-4fb2-87e8-49523d62ee27) Major  
Reporter: |
[chenfengLiu](https://issues.apache.org/jira/secure/ViewProfile.jspa?name=liufangqi)  
|

Now when we create a flink DataStream to write a hudi table. We usually use
the org.apache.hudi.streamer.HoodieFlinkStreamer moudle class.

The generated flink DAG contains BucketAssignFunction(required) and

StreamWriteFunction(required) and some other optioned Operator.
BucketAssignFunction will assgin bucket for incoming records, then
StreamWriteFunction will handle the stream witch keyed by bucket.

    
    
    DataStream<Object> pipeline = hoodieDataStream
        // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
        .keyBy(HoodieRecord::getRecordKey)
        .transform(
            "bucket_assigner",
            TypeInformation.of(HoodieRecord.class),
            new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
        .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
        .uid("uid_bucket_assigner")
        // shuffle by fileId(bucket id)
        .keyBy(record -> record.getCurrentLocation().getFileId())
        .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
        .uid("uid_hoodie_stream_write")
        .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); 

For reducing the small file num, BucketAssignFunction will roll the bucket
every 50w records by default. So at most time BucketAssignFunction will hold
the bucket num which equals to its parallelism. And usually
BucketAssignFunction has the same parallelism as StreamWriteFunction, we can't
promise that every single bucket will be send to only one StreamWriteFunction
task.

And finally we will get the data skew case like this:

![13045809_image-2022-06-28-18-40-04-298.png](https://issues.apache.org/jira/secure/attachment/13045809/13045809_image-2022-06-28-18-40-04-298.png)

![13045808_image-2022-06-28-18-40-39-455.png](https://issues.apache.org/jira/secure/attachment/13045808/13045808_image-2022-06-28-18-40-39-455.png)

The data skew may cause the backpresure which make ck timeout. And the flink
hudi write pipeline strongly depend on the ck completed to commit the instant.

I think should we chain the operator when BucketAssignFunction's parallelism
equals to StreamWriteFunction's parallelism.

It will improve the huge performance and Stability of the write job after
testing. It reslove the data skew and reduce the network overhead.  
  
---  
|  |  [ ![Add Comment](cid:jira-generated-image-static-comment-
icon-f1812490-f045-4c3f-86d3-ce64ec2e9cd8)
](https://issues.apache.org/jira/browse/HUDI-4337#add-comment "Add Comment") |
[Add Comment](https://issues.apache.org/jira/browse/HUDI-4337#add-comment "Add
Comment")  
---|---  
  
|  This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)

If image attachments aren't displayed, see [this
article](https://confluence.atlassian.com/display/JIRAKB/Image+attachments+are+not+displayed+in+emails).

|  |  ![Atlassian logo](https://issues.apache.org/jira/images/mail/atlassian-
email-logo.png)  
---