You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/25 08:15:46 UTC

[GitHub] [iceberg] zhengchar opened a new pull request #4228: The Data Skew Problem on FlinkSink

zhengchar opened a new pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228


   Hi,
   
   I tried to load 1TB data from TiDB to Iceberg by Flink. Iceberg table consists of 128 buckets partition. 
   
   I found data skew problem on FlinkSQL IcebergWriter stage. We set parrallism 128 on this stage, there are only 49 taskmanagers has data processing tasks, others are finished so quickly. 
   
   The data partition operator for a bucket partition table in Flink is 'keyby', a hash policy may  occur the data skew. I make a 
   custom partition function which can distribute a task for a table partition data to a taskmanager evenly。
   
   ![image](https://user-images.githubusercontent.com/58673451/155674884-440e1b1b-0c6c-4c53-a7c5-0be80d3aca2b.png)
   
   According to my testing, this function can make every tm has task to process and cut down data load time from 96 min to 38 min with 64 parallism.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhengchar commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
zhengchar commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1052224304


   > There is a recent Slack thread for the same issue where hash distribution leads to skewed shuffling (for bucketing partitions and probably other partition spec too): https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1645676203340179
   > 
   > I don't necessarily agree with the solution provided in this PR. We can provide some general solution in FlinkSink.
   > 
   > For bucketing partitions, we can implement a custom partitioner to shuffle data by the bucketing value to the downstream tasks/channels: `bucket(key) % downstream operator parallelism`.
   > 
   > I don't know if it makes sense to add `bucketing` enum value to `DistributionMode` or we can make this config only for FlinkSink.
   > 
   > cc @openinx @rdblue @kbendick
   
   Hi Steven,
   
   Glad to having discussion with you!
   
   I think in this problem there are three key aspects:
   - First, we must consider each partition will be wrote by only one task to solve small data files problem. 
   - Second, data is shuffled by bucket partition key now.
   - Third, bucket is a hashed policy, but 'truncate' partition or 'identity' partition has no hash attribute
   
   I agree with you that we need to provide a new method to make partition data and task mapped evenly, but not a hash function. I think a new KeySelector logic may apply on bucket partition situation and it is not fit on other partition spec (identity or truncate), or we can make this config only for flinksink on 'bucket' partition spec?
   
   BTW, could you please help to add me to the slack discussion mentioned above? Thanks very muck!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1061026606


   @zhengchar can you share you code snippet? thought the customer partitioner (with NONE distribution mode) should work for your case as well. not saying it is the general solution. trying to understand why it doesn't work for you or any other unique conditions from your use case that we missed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1051648085


   There is a recent Slack thread for the same issue where hash distribution leads to skewed shuffling (for bucketing partitions and probably other partition spec too): https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1645676203340179
   
   I don't necessarily agree with the solution provided in this PR. We can provide some general solution in FlinkSink.
   
   For bucketing partitions, we can implement a custom partitioner to shuffle data by the bucketing value to the downstream tasks/channels: `bucket(key) % downstream operator parallelism`.
   
   I don't know if it makes sense to add `bucketing` enum value to `DistributionMode` or we can make this config only for FlinkSink.
   
   cc @openinx @rdblue @kbendick 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhengchar commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
zhengchar commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1057577620


   > I just copied the solution from Slack here for further searching:
   > 
   > https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1646033885013799?thread_ts=1645676203.340179&cid=C025PH0G1D4
   > 
   > > hi [@Steven Wu](https://apache-iceberg.slack.com/team/U02CNF23RTL) [@Kyle Bendickson](https://apache-iceberg.slack.com/team/U0260PLFVBM), thanks for your help .
   > > it really help me a lot .
   > > i custom a partitioner that is consistent with iceberg's bucketing algorithm.
   > > so disable the shuffling (DistributionMode.HASH , It can still handle small files.
   > 
   > ```scala
   > val kafkaStream.partitionCustom(new Partitioner[String]() {
   >               val hash = Hashing.murmur3_32()
   >               override def partition(key: String, numPartitions: Int): Int = {
   >                 val res = hash.hashString(key, StandardCharsets.UTF_8).asInt()
   >                 (Integer.MAX_VALUE & res) % numPartitions
   >               }
   >             },value=>value.getField(0).toString)
   > FlinkSink.forRow(finalStream.javaStream,tableSchema)
   >           .tableLoader(loader)
   >           .writeParallelism(sinkParallelism)
   >           .distributionMode(DistributionMode.NONE)
   >           .build()
   > ```
   
   Hi @openinx,
   
   Thanks for your explanation. 
   
   I have tried the solution above. I found the  data skew problem still here on IcebergWriter stage.
   
   In my opinion,  there are two points:
   - we need a method to distribute data to different task evenly, but I think the HASH method is not very fit. So I tried to calculate a weight value for every partition, then mod to different task to  execute. 
   - If mode 'NONE' is used, I think a task will handle multi-partition data, on pos/eq deletion procedure will cause OOM and may occur little file problem. This is also a resource distribution problem. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1053942934


   > There is a recent Slack thread for the same issue where hash distribution leads to skewed shuffling (for bucketing partitions and probably other partition spec too): https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1645676203340179
   > 
   > I don't necessarily agree with the solution provided in this PR. We can provide some general solution in FlinkSink.
   > 
   > For bucketing partitions, we can implement a custom partitioner to shuffle data by the bucketing value to the downstream tasks/channels: `bucket(key) % downstream operator parallelism`.
   > 
   > I don't know if it makes sense to add `bucketing` enum value to `DistributionMode` or we can make this config only for FlinkSink.
   > 
   > cc @openinx @rdblue @kbendick
   
   I agree @stevenzwu, that I don't necessarily think this is the appropriate solution to the concern. Please do join the Iceberg Slack @zhengchar - as Steven mentioned this has come up very recently there.
   
   I wouldn't add something so specific like `bucketing` to `DistributionMode` (at least as proposed here), I would probably start by making a config to the `FlinkSink` and then trying to solve the problem somewhat more generally with respect to partitioning and required distribution - possibly making that opt-in / opt-out via the flink sink config in case it's not an issue for users. But having a `DistributionMode` of `bucketing` seems a little too situationally specific given the preconditions outlined here (only bucketing partition is used, etc). I agree with the idea of a custom partitioner to preshuffle the data by some (potentially configurable) factor of downstream parallelism (be that tasks or channels).
   
   I'd love to continue the discussion on Slack (or on the dev list, but Slack is great for casual async discussion and the Iceberg Slack is very active).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1054931334


   I agree with @stevenzwu that we need a general solution to fix this data skew issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1059367787


   After talking with Steven, I think we should go ahead and detect the case where we can use bucket values for distribution directly. There should be no need to add a mode to the table property for that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1054912982


   Thanks @zhengchar for bringing this interesting issue here, and thanks @stevenzwu  and @kbendick for providing the slack context. 
   
   In essence,  the current `PartitionKeySelector` will generate a `String` **key** which is composited by a `bucket` integer value of a narrow range ( usually `[0~bucketNum)` ).  And then in the `keyBy` operator , it will use the **key** to calculate  the `String#hashCode % parallelism` again to choose the preferred downstream channel. 
   
   The value range of the first hash  is becoming narrow, resulting in a large number of row conflicts in the same bucket for the second hash.
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1054942049


   I just copied the solution from Slack here for further searching: 
   
   https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1646033885013799?thread_ts=1645676203.340179&cid=C025PH0G1D4
   
   > hi [@Steven Wu](https://apache-iceberg.slack.com/team/U02CNF23RTL) [@Kyle Bendickson](https://apache-iceberg.slack.com/team/U0260PLFVBM), thanks for your help .
   it really help me a lot .
   i custom a partitioner that is consistent with iceberg's bucketing algorithm.
   so disable the shuffling (DistributionMode.HASH , It can still handle small files.
   
   ```scala
   val kafkaStream.partitionCustom(new Partitioner[String]() {
                 val hash = Hashing.murmur3_32()
                 override def partition(key: String, numPartitions: Int): Int = {
                   val res = hash.hashString(key, StandardCharsets.UTF_8).asInt()
                   (Integer.MAX_VALUE & res) % numPartitions
                 }
               },value=>value.getField(0).toString)
   FlinkSink.forRow(finalStream.javaStream,tableSchema)
             .tableLoader(loader)
             .writeParallelism(sinkParallelism)
             .distributionMode(DistributionMode.NONE)
             .build()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1058793833


   @zhengchar We all agree that HASH distribution is not a good fit for bucketing tables. In the sample code from the Slack thread, hash distribution is disabled and a custom partitioner is used to shuffle the data matching the bucketing function in Iceberg.
   
   how is the output/destination table partitioned? Trying to understand why the custom partitioner doesn't work for your scenario.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhengchar commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
zhengchar commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1060321377


   > @zhengchar We all agree that HASH distribution is not a good fit for bucketing tables. In the sample code from the Slack thread, hash distribution is disabled and a custom partitioner is used to shuffle the data matching the bucketing function in Iceberg.
   > 
   > how is the output/destination table partitioned? Trying to understand why the custom partitioner doesn't work for your scenario.
   
   Hi Steven,
   
   According to my description above, my dest-table just a bucket[64] table.   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #4228: Flink: The Data Skew Problem on FlinkSink

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #4228:
URL: https://github.com/apache/iceberg/pull/4228#issuecomment-1052992346


   @zhengchar please follow the instruction for Slack invite: https://iceberg.apache.org/community/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org