You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/12 01:17:34 UTC

[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r555444419



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       +1 on more generalized semantics. Current option only works if the data is relatively evenly distrubuted across table partitions. Otherwise, heavy data skew can be problematic for writer. The other problem is that effective writer parallelism now is limited by the number of partition values. Let's say the writer parallelism is 100, but the number of unique partition values are only 10. Then only 10 writer subtasks will get the data.
   
   I will add some notes for the streaming write mode. In a streaming job, it is probably impossible to do true sorting. Instead, what can be useful is some sort of "groupBy/bucketing" shuffle in the streaming sink. It can help with reducing too many concurrent open files per writer and improving read performance (predicate pushdown) with better data locality.
   
   E.g., a table is partition by (event_date, country). Without the shuffle, each writer task can write to ~200 files/countries. However, a simple keyBy is also problematic as it can produce heavy data skew for countries like US. Instead, we should calculate stats for each bucket/country and distribute the data based on the weight of each bucket. E.g., we may allocate 100 downstream subtasks for US, while allocating 1 downstream subtask for multiple small countries (like bin packing).
   
   This can also be extended to non-partition column (as logical partitioning), which can improve read performance with filtering. Similar to the above example with the tweak that country is not a partition column anymore. groupBy/bucketing shuffle can help improve data locality. 
   
   I was thinking about a groupBy operator where each subtask (running in taskmanager) can constantly report local statistics to operator coordinator (running in jobmanager), which then does the global aggregation and notify subtasks with the globally aggregated stats.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org