You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/16 19:34:43 UTC

[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559018257



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I think we're all in agreement to consider behavior across engines and consider settings that can be used anywhere. One of the major differences between batch and streaming is that it would be expensive and unusual to sort the records processed by a task, so actually producing records sorted locally within a task isn't something we should expect. My guess is that Flink won't support either local or global sort -- global is just task-level sort with a shared range partitioner.
   
   We do have a need to shuffle data to the write tasks in Flink. Shuffling by partitions is a good idea to start with because it covers the cases where hash partitioning is balanced. Skew in the partitions that @stevenzwu notes is a concern, but I'm also interested in handling skew in non-partition columns. If downstream reads are likely to filter by `country` in Steven's example, then clustering data by country in Flink is a good idea. Spark uses global ordering to handle this skew, which will estimate ranges of the sort keys and produce a partitioner config that balances the data.
   
   For Flink, what makes sense to me is to have options for the key-by operation: no key-by, key by `hash(partition)`, or key by `range(sort key)` -- where `range(sort key)` is determined using a method like Steven suggests. These distribution options correspond to the ones we plan to support in Spark, just without the task-level sort.
   
   I propose using a table property, `write.distribution.mode`, in both engines with values:
   * `none`: do not shuffle rows
   * `partition`: shuffle rows by `hash(partition)`
   * `sort`: shuffle rows by `range(sort key)` using a ranges provided by a skew calculation or from table metrics
   
   I considered a similar mode for sort, but Flink will probably ignore it and in Spark we are making it implicit: if the table's sort order is defined, then request a sort. So there is no need for an equivalent sort mode property.
   
   If we go with this, then we have one setting that works in both Flink and Spark. In Flink the property controls the `keyBy` behavior only, and in Spark it is combined with sort order to give the options that we've discussed:
   
   | Spark | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | distribute by partition key | range distribute by partition key |
   | ordered | no distribution, locally sorted | distribute by partition key, locally sorted | globally sorted |
   
   I think Flink would use a different sort key depending on whether the table order is set: it would range distribute by partition key or sort key. Here's an equivalent table:
   
   | Flink | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | distribute by partition key | range distribute by partition key |
   | ordered | no distribution or ordering | distribute by partition key | range distribute by sort key |




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org