You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/11 07:31:08 UTC

[GitHub] [iceberg] openinx opened a new pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

openinx opened a new pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064


   Provides a switch in org.apache.flink.sink.FlinkSink to shuffle by partition key, so that each partition/bucket will be wrote by only one task. That will reduce lots of small files in partitioned fanout write policy for flink sink.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559716019



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -209,7 +224,16 @@ public Builder equalityFieldColumns(List<String> columns) {
         }
       }
 
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema, equalityFieldIds);
+      // Convert the flink requested table schema to flink row type.
+      RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+
+      // Shuffle by partition key if possible.
+      if (shouldShuffleByPartition(table.properties()) && !table.spec().isUnpartitioned()) {
+        rowDataInput = rowDataInput.keyBy(new PartitionKeySelector(table.spec(), table.schema(), flinkRowType));

Review comment:
       Hashing over very large cardinality (like `user_id`) is fine. But I should caution about `keyBy` over partition key which has relatively small cardinality (like `bucket(user_id)`). It can get pretty uneven distribution. 
   ```
   {0=19, 1=62, 2=169, 3=282, 4=364, 5=318, 6=317, 7=207, 8=131, 9=60, 10=37, 11=15, 12=14, 13=4, 14=1}
   ```
   
   Here is the sample code that produced the above output.
   ```java
     @Test
     public void testKeyBy() {
       final int maxParallelism = 4000;
       final int numberOfTasks = 2000;
       final int numberOfBuckets = 10000;
   
       final Integer[] assignment = new Integer[numberOfTasks];
       for (int i = 0; i < numberOfTasks; ++i) {
         assignment[i] = 0;
       }
   
       for (int i = 0; i < numberOfBuckets; ++i) {
         final String key = "date=20210119/bucket=" + i;
         final int assignedTask = KeyGroupRangeAssignment.assignKeyToParallelOperator(
             key, maxParallelism, numberOfTasks);
         assignment[assignedTask] += 1;
       }
   
       final Map<Integer, Long> assignmentStats = Lists.newArrayList(assignment).stream()
           .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
       System.out.println(assignmentStats);
     }
   ```
   
   To avoid this problem, we have to write a custom partitioner that directly map bucket id to the subtask. The custom partitioner can be passed into `DataStream#partitionCustom()`.
   ```java
     static class BucketPartitioner implements Partitioner<Integer> {
   
       @Override
       public int partition(Integer key, int numPartitions) {
         return key % numPartitions;
       }
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559018257



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I think we're all in agreement to consider behavior across engines and consider settings that can be used anywhere. One of the major differences between batch and streaming is that it would be expensive and unusual to sort the records processed by a task, so actually producing records sorted locally within a task isn't something we should expect. My guess is that Flink won't support either local or global sort -- global is just task-level sort with a shared range partitioner.
   
   We do have a need to shuffle data to the write tasks in Flink. Shuffling by partitions is a good idea to start with because it covers the cases where hash partitioning is balanced. Skew in the partitions that @stevenzwu notes is a concern, but I'm also interested in handling skew in non-partition columns. If downstream reads are likely to filter by `country` in Steven's example, then clustering data by country in Flink is a good idea. Spark uses global ordering to handle this skew, which will estimate ranges of the sort keys and produce a partitioner config that balances the data.
   
   For Flink, what makes sense to me is to have options for the key-by operation: no key-by, key by `hash(partition)`, or key by `range(sort key)` -- where `range(sort key)` is determined using a method like Steven suggests. These distribution options correspond to the ones we plan to support in Spark, just without the task-level sort.
   
   I propose using a table property, `write.distribution.mode`, in both engines with values:
   * `none`: do not shuffle rows
   * `partition`: shuffle rows by `hash(partition)`
   * `sort`: shuffle rows by `range(sort key)` using a ranges provided by a skew calculation or from table metrics
   
   I considered a similar mode for sort, but Flink will probably ignore it and in Spark we are making it implicit: if the table's sort order is defined, then request a sort. So there is no need for an equivalent sort mode property.
   
   If we go with this, then we have one setting that works in both Flink and Spark. In Flink the property controls the `keyBy` behavior only, and in Spark it is combined with sort order to give the options that we've discussed:
   
   | Spark | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | hash distribute by partition key | range distribute by partition key |
   | ordered | no distribution, locally sorted | hash distribute by partition key, locally sorted | globally sorted |
   
   I think Flink would use a different sort key depending on whether the table order is set: it would range distribute by partition key or sort key. Here's an equivalent table:
   
   | Flink | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | hash distribute by partition key | range distribute by partition key |
   | ordered | no distribution or ordering | hash distribute by partition key | range distribute by sort key |




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559389518



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,46 @@ public Builder equalityFieldColumns(List<String> columns) {
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
-                                                         List<Integer> equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+    private boolean shouldShuffleByPartition(Map<String, String> properties) {
+      if (shuffleByPartition == null) {
+        // Use the configured table option if does not specify in FlinkSink explicitly.
+        return PropertyUtil.propertyAsBoolean(properties,
+            WRITE_SHUFFLE_BY_PARTITION,
+            WRITE_SHUFFLE_BY_PARTITION_DEFAULT);
+      } else {
+        return shuffleByPartition;

Review comment:
       That's a fair assessment.
   
   I ask as we typically use job-clusters at my work and then try to specify as much configuration as possible on the cluster's config. This is mostly to provide one easy way to track this, and is definitely tied to how our build and deployment system and configuration system is set up internally at my work.
   
   Outside of job clusters, I would agree that it's too coarse grained. And having the possibility to set it as a job config or a table config is good enough. I'm not even sure if Flink would accept arbitrary configurations at the cluster level (that it isn't aware of).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559264472



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I like Ryan's proposal. Maybe change the "partition" mode to "hash-partition" just to be more accurate. Technically, "range-partition" in the "sort" mode is also a "partition".
   
   For the sort with unordered, "bin-packing" may be more optimal. 
   
   E.g., this is the traffic distribution for a partition column.
   | | | | | | | | | | | |
   | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
   | Bucket | B0 | B1 | B2 | B3 | B4 | B5 | B6 | B7 | B8 | B9 |
   | Weight |  1 | 10 |  1 | 10 | 1 | 10 | 1 | 10 | 1 | 10 |  
   
   Assuming writer parallelism is 11, each writer task should get data with weight of 5 in a perfect world. For buckets with weight of 10, their data are assigned 2 writer tasks. For buckets with weight of 1, they should be bundled to a single writer task. Since this is a partition column, each partition key got written to a separate file anyway.
   
   If this is a non-partition column in ordered scenario, we may not want to apply the bin-packing. Otherwise, one writer task will write a file with bucket 0, 2, 4, 6, 8. Since Iceberg only has min-max column stats, it will result in (min=0, max=8) which is not great for filtering. If Iceberg support `list-of-values` column stats, it might be useful for some scenarios.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560018080



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       > In the short term, I expect Flink to only support none and partition modes.
   
   In this first PR,  I think it's great to support `none` & `partition` mode.  For unsorted table with `sort` mode,  we also had an internal discussion,  using the `DataStream#partitionCustom` (proposed from @stevenzwu ) way should be OK ;   For sorted table,   the records sorting is an problem ( for keeping the same semantics of SortOrder) ,  I suspect whether there's a real need for flink to write sorted records ( Could hava more discussion).   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560563376



##########
File path: api/src/main/java/org/apache/iceberg/DistributionMode.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash-partition: hash distribute by partition keys, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range-partition: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is
+ * suitable for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash-partition"), RANGE("range-partition");

Review comment:
       what if we call config name as `write.shuffle-mode`? would it make it more clear to user regarding `hash` vs `range`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559716019



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -209,7 +224,16 @@ public Builder equalityFieldColumns(List<String> columns) {
         }
       }
 
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema, equalityFieldIds);
+      // Convert the flink requested table schema to flink row type.
+      RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+
+      // Shuffle by partition key if possible.
+      if (shouldShuffleByPartition(table.properties()) && !table.spec().isUnpartitioned()) {
+        rowDataInput = rowDataInput.keyBy(new PartitionKeySelector(table.spec(), table.schema(), flinkRowType));

Review comment:
       Hashing over very large cardinality (like `user_id`) is fine. But I should caution about `keyBy` over partition key which has relatively small cardinality (like `bucket(user_id)`). It can get pretty uneven distribution. 
   ```
   {0=19, 1=62, 2=169, 3=282, 4=364, 5=318, 6=317, 7=207, 8=131, 9=60, 10=37, 11=15, 12=14, 13=4, 14=1}
   ```
   
   Here is the sample code.
   ```java
     @Test
     public void testKeyBy() {
       final int maxParallelism = 4000;
       final int numberOfTasks = 2000;
       final int numberOfBuckets = 10000;
   
       final Integer[] assignment = new Integer[2000];
       for (int i = 0; i < numberOfTasks; ++i) {
         assignment[i] = 0;
       }
   
       for (int i = 0; i < numberOfBuckets; ++i) {
         final String key = "date=20210119/bucket=" + i;
         final int assignedTask = KeyGroupRangeAssignment.assignKeyToParallelOperator(
             key, maxParallelism, numberOfTasks);
         assignment[assignedTask] += 1;
       }
   
       final Map<Integer, Long> assignmentStats = Lists.newArrayList(assignment).stream()
           .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
       System.out.println(assignmentStats);
     }
   ```
   
   To avoid this problem, we have to write a custom partitioner that directly map bucket id to the subtask. The custom partitioner can be passed into `DataStream#partitionCustom()`.
   ```java
     static class BucketPartitioner implements Partitioner<Integer> {
   
       @Override
       public int partition(Integer key, int numPartitions) {
         return key % numPartitions;
       }
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r555368269



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I think we should make sure all query engines are aligned with this.
   In my view, we should support the following cases:
   - local sort using the table sort order 
   - repartition using partition spec and local sort by the table sort order
   - global sort using the table sort order




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560663819



##########
File path: flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
##########
@@ -101,17 +101,24 @@ public FlinkCatalogTestBase(String catalogName, Namespace baseNamespace) {
     }
     if (isHadoopCatalog) {
       config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
-      config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hadoopWarehouse.getRoot());
     } else {
       config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
-      config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hiveWarehouse.getRoot());
       config.put(CatalogProperties.HIVE_URI, getURI(hiveConf));
     }
+    config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot()));

Review comment:
       It's not a fix,  just for abstraction,  so that we could get all data files under the given partition here: https://github.com/apache/iceberg/pull/2064/files#diff-0aaa93576853d5b379da121bc5d6161eb888fe15b88e3597374ed894d8c94917R275




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559264472



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I like Ryan's proposal. Maybe change the "partition" mode to "hash-partition" just to be more accurate. Technically, "range-partition" in the "sort" mode is also a "partition".
   
   For the sort with unordered, "bin-packing" may be more optimal. 
   
   E.g., this is the traffic distribution for a partition column.
   | | | | | | | | | | | |
   | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
   | Bucket | B0 | B1 | B2 | B3 | B4 | B5 | B6 | B7 | B8 | B9 |
   | Weight |  1 | 10 |  1 | 10 | 1 | 10 | 1 | 10 | 1 | 10 |  
   
   Assuming writer parallelism is 11, each writer task should get data with weight of 5 in a perfect world. For buckets with weight of 10, their data are assigned 2 writer tasks. For buckets with weight of 1, they should be bundled to a single writer task. Since this is a partition column, each partition key got written to a separate file anyway. So it doesn't hurt data locality and can improve balanced distribution among writer tasks.
   
   If this is a non-partition column in ordered scenario, we may not want to apply the bin-packing and range partition might be better. Otherwise, one writer task will write a file with bucket 0, 2, 4, 6, 8. Since Iceberg only has min-max column stats, it will result in (min=0, max=8) which is not great for filtering. If Iceberg support `list-of-values` column stats, it might be useful for some scenarios.
   
   Not sure if it is a case of over engineering or not though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560378013



##########
File path: flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
##########
@@ -101,17 +101,24 @@ public FlinkCatalogTestBase(String catalogName, Namespace baseNamespace) {
     }
     if (isHadoopCatalog) {
       config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop");
-      config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hadoopWarehouse.getRoot());
     } else {
       config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hive");
-      config.put(CatalogProperties.WAREHOUSE_LOCATION, "file://" + hiveWarehouse.getRoot());
       config.put(CatalogProperties.HIVE_URI, getURI(hiveConf));
     }
+    config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot()));

Review comment:
       Is this change related? This looks like a fix for something else.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560373431



##########
File path: api/src/main/java/org/apache/iceberg/DistributionMode.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash-partition: hash distribute by partition keys, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range-partition: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is
+ * suitable for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash-partition"), RANGE("range-partition");

Review comment:
       As I noted in the comment thread, I think that we should use "partition" to describe only table partitions. Otherwise, we are going to create confusion. We can use "hash" and "range" here if there is consensus that "partition" and "sort" are not clear, but I don't think that we should use the term "partition" to refer to distribution within a processing engine.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r557548243



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       @openinx those limitations are the result of hash shuffling. If we use traffic distribution stats per bucket/group, we can shuffle the traffic relatively evenly to all downstream writer tasks. Table partition (like number of buckets) doesn't have to tie with writer parallelism, which is inflexible. Weight based shuffle can make use of all writer tasks. It won't be perfect, but should be reasonably good.
   
   Regarding the example of bucket transformation for table partition, it works for table that are partitioned by some well distributed "id" column. We actually have a super large table using this pattern. But that is not a general scenario though. E.g., we may have table partitioned by "ts" and "country" columns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r562325243



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       > Eventually, we want to get to where the table metadata has a sketch of the data distribution so you can use that to get ranges for a global ordering.
   
   I was also thinking about how to partition the (-oo, +oo) into several even  key ranges ( partition key ranges or sort key ranges) for flink.   Seems this idea is similar to the @stevenzwu 's `list-of-values column stats` from https://github.com/apache/iceberg/pull/2064#discussion_r559264472 .  Yes, that helps a lot if we have such fine-grained column range stats.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r555879177



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       cc @jacques-n @omalley @rdblue as it was related to the discussion we had during the last sync.

##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       cc @jacques-n @omalley @rdblue as it is related to the discussion we had during the last sync.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r555444419



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       +1 on more generalized semantics. Current option only works if the data is relatively evenly distrubuted across table partitions. Otherwise, heavy data skew can be problematic for writer. The other problem is that effective writer parallelism now is limited by the number of partition values. Let's say the writer parallelism is 100, but the number of unique partition values are only 10. Then only 10 writer subtasks will get the data.
   
   I will add some notes for the streaming write mode. In a streaming job, it is probably impossible to do true sorting. Instead, what can be useful is some sort of "groupBy/bucketing" shuffle in the streaming sink. It can help with reducing too many concurrent open files per writer and improving read performance (predicate pushdown) with better data locality.
   
   E.g., a table is partition by (event_date, country). Without the shuffle, each writer task can write to ~200 files/countries. However, a simple keyBy is also problematic as it can produce heavy data skew for countries like US. Instead, we should calculate stats for each bucket/country and distribute the data based on the weight of each bucket. E.g., we may allocate 100 downstream subtasks for US, while allocating 1 downstream subtask for multiple small countries (like bin packing).
   
   This can also be extended to non-partition column (as logical partitioning), which can improve read performance with filtering. Similar to the above example with the tweak that country is not a partition column anymore. groupBy/bucketing shuffle can help improve data locality. 
   
   I was thinking about a groupBy operator where each subtask (running in taskmanager) can constantly report local statistics to operator coordinator (running in jobmanager), which then does the global aggregation and notify subtasks with the globally aggregated stats.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r555444419



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       +1 on more generalized semantics. Current option only works if the data is relatively evenly distrubuted across table partitions. Otherwise, heavy data skew can be problematic for writer. The other problem is that effective writer parallelism now is limited by the number of partition values. Let's say the writer parallelism is 100, but the number of unique partition values are only 10. Then only 10 writer subtasks will get the data.
   
   I will add some notes for the streaming write mode. In a streaming job, it is probably impossible to do true sorting. Instead, what can be useful is some sort of "groupBy/bucketing" shuffle in the streaming sink. It can help with reducing too many concurrent open files per writer and improving read performance (predicate pushdown) with better data locality.
   
   E.g., a table is partition by (event_date, country). Without the shuffle, each writer task can write to ~200 files/countries. However, a simple keyBy is also problematic as it can produce heavy data skew for countries like US. Instead, we should calculate stats for each bucket/country and distribute the data based on the weight of each bucket. E.g., we may allocate 100 downstream subtasks for US, while allocating 1 downstream subtask for multiple small countries (like bin packing).
   
   This can also be extended to non-partition column (as logical partitioning), which can improve read performance with filtering. Similar to the above example with the tweak that country is not a partition column anymore. groupBy/bucketing shuffle can help improve data locality. 
   
   I was thinking about a groupBy/orderBy operator where each subtask (running in taskmanager) can constantly report local statistics to operator coordinator (running in jobmanager), which then does the global aggregation and notify subtasks with the globally aggregated stats.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r555916219



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       Maybe Flink sink can add a generic orderBy option that behaves slightly different in batch or streaming mode
   * batch: global sort with skew estimation
   * streaming: global stats aggregation and distribution in operator coordinator and use the stats to do bin packing and split to downstream writers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560641150



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       > Flink may eventually provide a way to order within data files, but I think that is less important than clustering data across files so that data files can be skipped in queries.
   
   Agreed. Though sorting within data file would be really helpful for page skipping,  but that would introduce more cost for streaming processing job.  Range distribution by sorted keys is some kind of coarse granularity,  but it's good enough for streaming job to cluster keys for filtering among data files,  I think it's a better balanced choice when trade off between write efficiency and read performances. 
   
   It make sense to me that rewriting those range distributed data files into row-ordering files if there're heavy reads that depends on them. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] electrum commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
electrum commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r561367698



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       @rdblue thanks for the pointer. Here are my thoughts on how this would work for Trino (formerly Presto SQL).
   
   Trino does streaming execution between stages -- there is no materialized shuffle phase. This means that global sorting would only be possible using a fixed range, not based on statistics, so it would be vulnerable to skew. I'd like to understand the use case for global "sort" compared to "partition".
   
   For local sorting, I see two choices:
   
   1. Write arbitrarily large files. Use a fixed size in-memory buffer, sort when full, write to temporary file, then merge files at end. There may be multiple merge passes in order to limit the number of files read at once during the merge. This is what we do for Hive bucketed-sorted tables, since sorting per bucket is required.
   2. Write multiple size-limited files. Use a fixed size in-memory buffer, sort when full, write final output file. Repeat until all input data for writer has been consumed.
   
   I would prefer the second option as it is simpler and uses fewer resources. It satisfies the property that each file is sorted and helps with compression and within-file filtering. The downside is that there are more files, but if they are of sufficient size, it shouldn't affect reads as we split files anyway when reading.
   
   Another option is to sort data using a fixed size buffer before writing each batch of rows. This would help with compression and within-file filtering, but wouldn't provide a guarantee on sorting for readers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559076627



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       We have touched this topic a few times during the community syncs and the table property approach combined with the partition spec and sort order seems reasonable to me. I think we would want to send a mail to the dev list once we have a local consensus here to make sure other query engines are ok with our idea.
   
   I like the table property approach versus a job/session config because it is enough to change it in one place and the change will be automatically propagated to all jobs without any changes in the job/session config. That being said, I think there should be a way to override the distribution mode in a job. In Spark, we can do that with write options. I think the same applies to cases when different query engines need different behavior. For example, Flink may want to use `none` and Spark batch jobs may want `sort`. Then having a default value in table props and the ability to override in a job should be sufficient.
   
   I support the idea of having one table property for all query engines as it seems flexible enough to cover various cases (at least, for Flink and Spark). If others have concerns, I'll be okay with engine-specific properties as well. I also understand that not all query engines will support this property right away. So having separate props may actually give us some benefits.
   
   @rdblue, thanks for the table! I think for Spark, if the table is unordered, we would want to add a local sort based on partition columns in all modes with a write option to disable inference. What do you think?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560449935



##########
File path: api/src/main/java/org/apache/iceberg/DistributionMode.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash-partition: hash distribute by partition keys, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range-partition: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is
+ * suitable for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash-partition"), RANGE("range-partition");

Review comment:
       +1 on not using the term "partition" when talking about distribution.
   
   W.r.t. naming, I did ask myself the question what should be the best names here. I tend to like "hash" and "range" a bit more as it may not be clear that `partition` refers to the table's partition spec.
   
   I guess the real question here is what does this table property control? Are we allowing to control whether to use hash or range distribution or do we control whether the distribution is based on the partition spec or sort order?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559714619



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       Hashing over very large cardinality (like `user_id`) is fine. But I should caution about `keyBy` over partition key which has relatively small cardinality (like `bucket(user_id)`). It can get pretty uneven distribution. 
   ```
   {0=19, 1=62, 2=169, 3=282, 4=364, 5=318, 6=317, 7=207, 8=131, 9=60, 10=37, 11=15, 12=14, 13=4, 14=1}
   ```
   
   Here is the sample code.
   ```java
     @Test
     public void testKeyBy() {
       final int maxParallelism = 4000;
       final int numberOfTasks = 2000;
       final int numberOfBuckets = 10000;
   
       final Integer[] assignment = new Integer[2000];
       for (int i = 0; i < numberOfTasks; ++i) {
         assignment[i] = 0;
       }
   
       for (int i = 0; i < numberOfBuckets; ++i) {
         final String key = "date=20210119/bucket=" + i;
         final int assignedTask = KeyGroupRangeAssignment.assignKeyToParallelOperator(
             key, maxParallelism, numberOfTasks);
         assignment[assignedTask] += 1;
       }
   
       final Map<Integer, Long> assignmentStats = Lists.newArrayList(assignment).stream()
           .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
       System.out.println(assignmentStats);
     }
   ```
   
   To avoid this problem, we have to write a custom partitioner that directly map bucket id to the subtask and pass it to `DataStream#partitionCustom()`.
   ```java
     static class BucketPartitioner implements Partitioner<Integer> {
   
       @Override
       public int partition(Integer key, int numPartitions) {
         return key % numPartitions;
       }
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559716019



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -209,7 +224,16 @@ public Builder equalityFieldColumns(List<String> columns) {
         }
       }
 
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema, equalityFieldIds);
+      // Convert the flink requested table schema to flink row type.
+      RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+
+      // Shuffle by partition key if possible.
+      if (shouldShuffleByPartition(table.properties()) && !table.spec().isUnpartitioned()) {
+        rowDataInput = rowDataInput.keyBy(new PartitionKeySelector(table.spec(), table.schema(), flinkRowType));

Review comment:
       Hashing over very large cardinality (like `user_id`) is fine. But I should caution about `keyBy` over partition key which has relatively small cardinality (like `bucket(user_id)`). It can get pretty uneven distribution. 
   ```
   {0=19, 1=62, 2=169, 3=282, 4=364, 5=318, 6=317, 7=207, 8=131, 9=60, 10=37, 11=15, 12=14, 13=4, 14=1}
   ```
   
   Here is the sample code that produced the above output.
   ```java
     @Test
     public void testKeyBy() {
       final int maxParallelism = 4000;
       final int numberOfTasks = 2000;
       final int numberOfBuckets = 10000;
   
       final Integer[] assignment = new Integer[numberOfTasks];
       for (int i = 0; i < numberOfTasks; ++i) {
         assignment[i] = 0;
       }
   
       for (int i = 0; i < numberOfBuckets; ++i) {
         final String key = "date=20210119/bucket=" + i;
         final int assignedTask = KeyGroupRangeAssignment.assignKeyToParallelOperator(
             key, maxParallelism, numberOfTasks);
         assignment[assignedTask] += 1;
       }
   
       final Map<Integer, Long> assignmentStats = Lists.newArrayList(assignment).stream()
           .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
       System.out.println(assignmentStats);
     }
   ```
   
   To avoid this problem, we have to write a custom partitioner that directly map bucket id to the subtask. The custom partitioner can be passed into `DataStream#partitionCustom()`.
   ```java
     static class BucketPartitioner implements Partitioner<Integer> {
   
       @Override
       public int partition(Integer key, int numPartitions) {
         return key % numPartitions;
       }
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r555369070



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       In Spark, our plan was to support the following commands:
   
   ```
   -- global
   ALTER TABLE WRITE
   ORDERED BY p1, bucket(id, 128), c1, c2
   
   -- hash + local sort
   ALTER TABLE WRITE
   DISTRIBUTED BY p1, bucket(id, 128)
   LOCALLY ORDERED BY p1, bucket(id, 128), c1, c2
   
   -- local sort
   ALTER TABLE WRITE
   LOCALLY ORDERED BY p1, bucket(id, 128), c1, c2
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560480872



##########
File path: api/src/main/java/org/apache/iceberg/DistributionMode.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash-partition: hash distribute by partition keys, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range-partition: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is
+ * suitable for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash-partition"), RANGE("range-partition");

Review comment:
       I think it controls whether we use hash distribution or range distribution. I agree that's more clear from a developer's perspective. My concern is that users won't know what hash and range are, but they do understand what a partition is and what sorting is.
   
   Let's go with hash and range for now. I think we can explain it well enough in docs, and we can also add aliases that are more clear if needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r557099820



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       Let me collect all the question here: 
   
   > Align the table properties to other engines ?
   
   Yes,  I agreed.   @aokolnychyi ,  I think the three write modes are related to __SortOrder__  specification,  different mode decides the real write behavior.  (btw, what's the semantic of __local sort__,  global sort is quite easy to understand,  does the local sort means we will buffer those records in a in-memory sorted map,  and then  flush them into a file once reached memory-threshold ?  Then all the records written by the same task are always sorted locally ? ) .  
   
   Back to this PR,  we don't define the sort order write behavior for flink.   The purpose is reducing small files by shuffling so that each sub-task don't have to write so many files.   It's a specific option for streaming job (As @stevenzwu  said,  we don't do real sort in flink streaming because it's expensive to accomplish sort when processing record one by one incrementally ).    I'm thinking it's not good to define this as a global table properties because it's actually a job-level configuration key.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560619616



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_DISTRIBUTION_MODE = "write.distribution-mode";
+  public static final String WRITE_DISTRIBUTION_MODE_DEFAULT = "none";

Review comment:
       define the default value directly as the enum?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560387985



##########
File path: api/src/main/java/org/apache/iceberg/DistributionMode.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash-partition: hash distribute by partition keys, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range-partition: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is
+ * suitable for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash-partition"), RANGE("range-partition");

Review comment:
       "Partition" and "sort" aren't very clear to me. Both "hash partition" and "range partition" are "partitions". But in the table, they are listed as "partition" and "sort".
   
   The general concepts that @rdblue defined in the table above are still very good guidances for us to think about those dimensions. But if Flink and Spark are going to support different behaviors, maybe it is better for them to define different values to be more accurately describe the behavior.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r554935039



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
##########
@@ -220,4 +232,52 @@ public void testInsertIntoPartition() throws Exception {
 
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
   }
+
+  @Test
+  public void testWithShuffleByPartition() throws Exception {
+    String tableName = "test_shuffle_by_partition";
+
+    Map<String, String> tableProps = ImmutableMap.of(
+        "write.format.default", format.name(),
+        "write.shuffle-by.partition", "true"
+    );
+    sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
+        tableName, toWithClause(tableProps));
+
+    // Insert data set.
+    sql("INSERT INTO %s VALUES " +
+        "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
+        "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
+        "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
+
+    Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
+    SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
+        SimpleDataUtil.createRecord(1, "aaa"),
+        SimpleDataUtil.createRecord(1, "bbb"),
+        SimpleDataUtil.createRecord(1, "ccc"),
+        SimpleDataUtil.createRecord(2, "aaa"),
+        SimpleDataUtil.createRecord(2, "bbb"),
+        SimpleDataUtil.createRecord(2, "ccc"),
+        SimpleDataUtil.createRecord(3, "aaa"),
+        SimpleDataUtil.createRecord(3, "bbb"),
+        SimpleDataUtil.createRecord(3, "ccc")
+    ));
+
+    Assert.assertEquals("Should 1 data file in partition 'aaa'", 1, partitionFiles(tableName, "aaa").size());

Review comment:
       Small nit: Consider `There should be [only] 1 data file in partition 'aaa'`. And similarly for the other files.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,46 @@ public Builder equalityFieldColumns(List<String> columns) {
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
-                                                         List<Integer> equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+    private boolean shouldShuffleByPartition(Map<String, String> properties) {
+      if (shuffleByPartition == null) {
+        // Use the configured table option if does not specify in FlinkSink explicitly.
+        return PropertyUtil.propertyAsBoolean(properties,
+            WRITE_SHUFFLE_BY_PARTITION,
+            WRITE_SHUFFLE_BY_PARTITION_DEFAULT);
+      } else {
+        return shuffleByPartition;

Review comment:
       Somewhat unrelated question: Is it possible to set these values at the cluster level (like with Flink's properties), or is it only code property then table property only? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r557548243



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       @openinx those limitations are the result of hash shuffling. If we use traffic distribution stats per bucket/group, we can shuffle the traffic relatively evenly to all downstream writer tasks. Table partition (like number of buckets) doesn't have to tie with writer parallelism, which is inflexible. Weight based shuffle can make use of all writer tasks. It won't be perfect, but should be reasonably good.
   
   Regarding the example of bucket transformation for table partition, it works for table that are partitioned by some well distributed "id" column. We actually have a super large table using this pattern. But that is not a general scenario though, as not every table can to want to have a bucket partition. E.g., we may have table partitioned only by "ts" and "country" columns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#issuecomment-763021324


   This looks nearly ready. Mainly, I would like to get consensus on the config values.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r554991546



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,46 @@ public Builder equalityFieldColumns(List<String> columns) {
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
-                                                         List<Integer> equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+    private boolean shouldShuffleByPartition(Map<String, String> properties) {
+      if (shuffleByPartition == null) {
+        // Use the configured table option if does not specify in FlinkSink explicitly.
+        return PropertyUtil.propertyAsBoolean(properties,
+            WRITE_SHUFFLE_BY_PARTITION,
+            WRITE_SHUFFLE_BY_PARTITION_DEFAULT);
+      } else {
+        return shuffleByPartition;

Review comment:
       I think it's flexible to provide job-level and table-level for this option,  shuffling by partition for the whole cluster's job seems to be coarse granularity. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560482344



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       FYI @electrum. You may be interested in this discussion for recommended write behavior from table config.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560042631



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       > Maybe change the "partition" mode to "hash-partition" just to be more accurate. Technically, "range-partition" in the "sort" mode is also a "partition".
   
   I guess the `partition` from @rdblue 's propose  means that it will distribute the records by hashing partition-keys,  the `sort` means that it will distribute the records by sorted keys if the table has one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560371817



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       @stevenzwu, I used `partition` to describe distributing by partition key. I'm trying to use "partition" to refer to table partitions, not partitioning in an engine, which I would refer to as "distribution" in table metadata. I'm making that distinction to avoid having multiple definitions of "partition" that users need to think about, so "partition" should refer to table partitioning and "distribution" would refer to how data is assigned to tasks in an engine.
   
   You also raise a good point about partitioning. If the number of partitions is going to be the same order of magnitude as the number of writers, then a hash assignment strategy could be a problem; some tasks could get two partitions and some could get none. But, I'm not sure how to detect that case and know the output size to use your bin-packing suggestion. We could add a factor to distribute rows to N writers per partition to help balance the hashing. And when the number of partitions is high, hash assignment would work well. I think if we have a situation like you're describing, the `sort` distribution mode is going to be the best option.
   
   For now, I think the current `keyBy` implementation is a step in the right direction. We can iterate to improve on it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#issuecomment-763263425


   I think there is consensus around using "none", "hash", and "range" for the distribution mode. Once that's implemented, I think this is ready to commit. I also had some other minor comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559264472



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I like Ryan's proposal. Maybe change the "partition" mode to "hash-partition" just to be more accurate. Technically, "range-partition" in the "sort" mode is also a "partition".
   
   For the sort with unordered, "bin-packing" may be more optimal. 
   
   E.g., this is the traffic distribution for a partition column.
   Bucket:  B0, B1, B2, B3, B4, B5, B6, B7, B8, B9
   Weight:  1,   10,  1,   10,   1,   10,   1,   10,  1,    10   
   
   Assuming writer parallelism is 11, each writer task should get data with weight of 5 in a perfect world. For buckets with weight of 10, their data are assigned 2 writer tasks. For buckets with weight of 1, they should be bundled to a single writer task. Since this is a partition column, each partition key got written to a separate file anyway.
   
   If this is a non-partition column in ordered scenario, we may not want to apply the bin-packing. Otherwise, one writer task will write a file with bucket 0, 2, 4, 6, 8. Since Iceberg only has min-max column stats, it will result in (min=0, max=8) which is not great for filtering. If Iceberg support `list-of-values` column stats, it might be useful for some scenarios.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560365972



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       The sort order of a table is a recommendation, not a requirement. And you're right that it is for writing. That's why the DDL to update it is `WRITE ORDERED BY ...`.
   
   We don't guarantee a sort order on read except when a data or (eq) delete file has a sort order in metadata (see #1975). The sort order for a table may change and even if writes are globally sorted, multiple writes to the same partitions will produce different file sets that can't be read in order to produce sorted records. That's why we don't make guarantees about reads. Ordering on write is primarily a way to cluster rows for efficient filtering.
   
   If row-level ordering is expensive, as it is for Flink, then it is perfectly fine to ignore the recommendation. Flink may eventually provide a way to order within data files, but I think that is less important than clustering data across files so that data files can be skipped in queries. That's what Steven's idea would achieve, along with handling skew.
   
   It is still valuable to have a write order, even if Flink doesn't guarantee it. If Flink can cluster data by that order, then that's really helpful. And, other services can rewrite those data files after the data is available if row ordering is needed for page skipping within data files. A service that sorts data files after Flink writes them also needs to know the desired order.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559018257



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I think we're all in agreement to consider behavior across engines and consider settings that can be used anywhere. One of the major differences between batch and streaming is that it would be expensive and unusual to sort the records processed by a task, so actually producing records sorted locally within a task isn't something we should expect. My guess is that Flink won't support either local or global sort -- global is just task-level sort with a shared range partitioner.
   
   We do have a need to shuffle data to the write tasks in Flink. Shuffling by partitions is a good idea to start with because it covers the cases where hash partitioning is balanced. Skew in the partitions that @stevenzwu notes is a concern, but I'm also interested in handling skew in non-partition columns. If downstream reads are likely to filter by `country` in Steven's example, then clustering data by country in Flink is a good idea even if it isn't a partition column. Spark uses global ordering to handle this skew, which will estimate ranges of the sort keys and produce a partitioner config that balances the data.
   
   For Flink, what makes sense to me is to have options for the key-by operation: no key-by, key by `hash(partition)`, or key by `range(sort key)` -- where `range(sort key)` is determined using a method like Steven suggests. These distribution options correspond to the ones we plan to support in Spark, just without the task-level sort.
   
   I propose using a table property, `write.distribution.mode`, in both engines with values:
   * `none`: do not shuffle rows
   * `partition`: shuffle rows by `hash(partition)`
   * `sort`: shuffle rows by `range(sort key)` using a ranges provided by a skew calculation or from table metrics
   
   I considered a similar mode for sort, but Flink will probably ignore it and in Spark we are making it implicit: if the table's sort order is defined, then request a sort. So there is no need for an equivalent sort mode property.
   
   If we go with this, then we have one setting that works in both Flink and Spark. In Flink the property controls the `keyBy` behavior only, and in Spark it is combined with sort order to give the options that we've discussed:
   
   | Spark | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | hash distribute by partition key | range distribute by partition key |
   | ordered | no distribution, locally sorted | hash distribute by partition key, locally sorted | globally sorted |
   
   I think Flink would use a different sort key depending on whether the table order is set: it would range distribute by partition key or sort key. Here's an equivalent table:
   
   | Flink | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | hash distribute by partition key | range distribute by partition key |
   | ordered | no distribution or ordering | hash distribute by partition key | range distribute by sort key |
   
   In the short term, I expect Flink to only support `none` and `partition` modes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r562160730



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       @electrum, as far as what a "local sort" means, I think option 2 sounds good to me for a task-level sort. If that sort is needlessly expensive, then it is okay for Trino to skip it. But I think that if a table has a defined sort order, the right thing would be for Trino to apply it.
   
   For data distribution, it sounds like Trino will only support `none` and `hash` modes in the short term. That's reasonable given that you can't stage data and use it twice. Even with shuffle data reuse, global sort in Spark is quite expensive in some cases (doing a large join twice, for example). Eventually, we want to get to where the table metadata has a sketch of the data distribution so you can use that to get ranges for a global ordering.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r562325243



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       > Eventually, we want to get to where the table metadata has a sketch of the data distribution so you can use that to get ranges for a global ordering.
   
   I was also thinking about how to partition the (-oo, +oo) into several even  key ranges ( partition key ranges or sort key ranges) for flink.   Seems this idea is similar to the @stevenzwu 's `list-of-values column stats` from https://github.com/apache/iceberg/pull/2064#discussion_r559264472 .  Yes, that helps a lot if we have such fine-grained column range stats.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r555685966



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I have the same concern as @stevenzwu that a hash distribution by partition spec would co-locate all entries for the same partition in the same task, potentially leading to having too much data in a task. The global sort in Spark would be a better option here for batch jobs as it will do skew estimation and the sort order can be used to split data for the same partition across multiple tasks.
   
   To sum up, I think we should be flexible and support 3 modes to cover different use cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560400754



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,71 @@ public Builder equalityFieldColumns(List<String> columns) {
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
-                                                         List<Integer> equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+    private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+                                                     Map<String, String> properties,
+                                                     PartitionSpec partitionSpec,
+                                                     Schema iSchema,
+                                                     RowType flinkRowType) {
+      DistributionMode writeMode;
+      if (distributionMode == null) {
+        // Fallback to use distribution mode parsed from table properties if don't specify in job level.
+        String modeName = PropertyUtil.propertyAsString(properties,
+            WRITE_DISTRIBUTION_MODE,
+            WRITE_DISTRIBUTION_MODE_DEFAULT);
+
+        writeMode = DistributionMode.fromName(modeName);
+      } else {
+        writeMode = distributionMode;
+      }
+
+      switch (writeMode) {
+        case NONE:
+          return input;
+
+        case HASH:
+          if (partitionSpec.isUnpartitioned()) {
+            return input;
+          } else {
+            return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));

Review comment:
       This isn't going to cover all cases, but I think it is a necessary first step. Data skew is going to require range distribution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560643997



##########
File path: api/src/main/java/org/apache/iceberg/DistributionMode.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash-partition: hash distribute by partition keys, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range-partition: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is
+ * suitable for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash-partition"), RANGE("range-partition");

Review comment:
       @stevenzwu ,   I think `write.shuffle-mode` is enough to express the write behavior of flink, but not enough to express the write behavior of spark, because spark will distribute those records with local sort or global sort. 
   
   +1 on keeping `write.distribution-mode` and using the `hash` & `range` values now ( Though the `range` does not fully express the sort meaning from spark, but I can not think of a better word to express the exact meaning for both flink and spark ). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560006175



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       @rdblue  I like the table you provided,  I have one question : For an iceberg table which has defined its __SortOder__  columns,   the spark job will write the sorted records (based on sort keys) into parquet files,  should the flink job also write the sorted records into parquet files ?  Should we keep the same semantic of __SortOrder__  among different engines  although it's not cheap to accomplish the goal ? (  I raise this question because I saw the __Flink__ table  does not require locally sorted or global sorted )  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560042631



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       > Maybe change the "partition" mode to "hash-partition" just to be more accurate. Technically, "range-partition" in the "sort" mode is also a "partition".
   
   I guess the `partition` from @rdblue 's propose  means that it will distribute the records by hashing partition-keys,  the `sort` means that it will distribute the records by sorted keys if the table has one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560375440



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,71 @@ public Builder equalityFieldColumns(List<String> columns) {
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
-                                                         List<Integer> equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+    private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+                                                     Map<String, String> properties,
+                                                     PartitionSpec partitionSpec,
+                                                     Schema iSchema,
+                                                     RowType flinkRowType) {
+      DistributionMode writeMode;
+      if (distributionMode == null) {
+        // Fallback to use distribution mode parsed from table properties if don't specify in job level.
+        String modeName = PropertyUtil.propertyAsString(properties,
+            WRITE_DISTRIBUTION_MODE,
+            WRITE_DISTRIBUTION_MODE_DEFAULT);
+
+        writeMode = DistributionMode.fromName(modeName);
+      } else {
+        writeMode = distributionMode;
+      }
+
+      switch (writeMode) {
+        case NONE:
+          return input;
+
+        case HASH:
+          if (partitionSpec.isUnpartitioned()) {
+            return input;
+          } else {
+            return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+          }
 
-    RowType flinkSchema;
+        case RANGE:
+          throw new UnsupportedOperationException("The write.distribution-mode=range is not supported in flink now");

Review comment:
       By throwing an exception here, users could break jobs by setting the distribution mode. Is that okay? I guess it wouldn't affect running jobs because they are already configured.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559076627



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       We have touched this topic a few times during the community syncs and the table property approach combined with the partition spec and sort order seems reasonable to me. I think we would want to send a mail to the dev list once we have a local consensus here to make sure other query engines are ok with our idea.
   
   I like the table property approach versus a job/session config because it is enough to change it in one place and the change will be automatically propagated to all jobs without any changes in the job/session config. That being said, I think there should be a way to override the distribution mode in a job. In Spark, we can do that with write options. I think the same applies to cases when different query engines need different behavior. For example, Flink may want to use `none` and Spark batch jobs may want `sort`. Then having a default value in table props and the ability to override in a job should be sufficient.
   
   I support the idea of having one table property for all query engines as it seems flexible enough to cover various cases (at least, for Flink and Spark). If others have concerns, I'll be okay with engine-specific properties as well. I also understand that not all query engines will support this property right away.
   
   @rdblue, thanks for the table! I think for Spark, if the table is unordered, we would want to add a local sort based on partition columns in all modes with a write option to disable inference. What do you think?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559018257



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I think we're all in agreement to consider behavior across engines and consider settings that can be used anywhere. One of the major differences between batch and streaming is that it would be expensive and unusual to sort the records processed by a task, so actually producing records sorted locally within a task isn't something we should expect. My guess is that Flink won't support either local or global sort -- global is just task-level sort with a shared range partitioner.
   
   We do have a need to shuffle data to the write tasks in Flink. Shuffling by partitions is a good idea to start with because it covers the cases where hash partitioning is balanced. Skew in the partitions that @stevenzwu notes is a concern, but I'm also interested in handling skew in non-partition columns. If downstream reads are likely to filter by `country` in Steven's example, then clustering data by country in Flink is a good idea even if it isn't a partition column. Spark uses global ordering to handle this skew, which will estimate ranges of the sort keys and produce a partitioner config that balances the data.
   
   For Flink, what makes sense to me is to have options for the key-by operation: no key-by, key by `hash(partition)`, or key by `range(sort key)` -- where `range(sort key)` is determined using a method like Steven suggests. These distribution options correspond to the ones we plan to support in Spark, just without the task-level sort.
   
   I propose using a table property, `write.distribution-mode`, in both engines with values:
   * `none`: do not shuffle rows
   * `partition`: shuffle rows by `hash(partition)`
   * `sort`: shuffle rows by `range(sort key)` using a ranges provided by a skew calculation or from table metrics
   
   I considered a similar mode for sort, but Flink will probably ignore it and in Spark we are making it implicit: if the table's sort order is defined, then request a sort. So there is no need for an equivalent sort mode property.
   
   If we go with this, then we have one setting that works in both Flink and Spark. In Flink the property controls the `keyBy` behavior only, and in Spark it is combined with sort order to give the options that we've discussed:
   
   | Spark | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution, locally sort by partition key | hash distribute by partition key, locally sort by partition key | range distribute by partition key, locally sort by partition key |
   | ordered | no distribution, locally sorted | hash distribute by partition key, locally sorted | globally sorted |
   
   I think Flink would use a different sort key depending on whether the table order is set: it would range distribute by partition key or sort key. Here's an equivalent table:
   
   | Flink | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | hash distribute by partition key | range distribute by partition key |
   | ordered | no distribution or ordering | hash distribute by partition key | range distribute by sort key |
   
   In the short term, I expect Flink to only support `none` and `partition` modes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559803060



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       Hashing over very large cardinality (like `user_id`) is fine. But I should caution about `keyBy` over partition key which has relatively small cardinality (like `bucket(user_id)`). It can get pretty uneven distribution. 
   ```
   {0=19, 1=62, 2=169, 3=282, 4=364, 5=318, 6=317, 7=207, 8=131, 9=60, 10=37, 11=15, 12=14, 13=4, 14=1}
   ```
   
   Here is the sample code that simulate the scenario that we experienced and probably also cover the hash keyBy of partitionKey in this PR.
   ```java
     @Test
     public void testKeyBy() {
       final int maxParallelism = 4000;
       final int numberOfTasks = 2000;
       final int numberOfBuckets = 10000;
   
       final Integer[] assignment = new Integer[numberOfTasks];
       for (int i = 0; i < numberOfTasks; ++i) {
         assignment[i] = 0;
       }
   
       for (int i = 0; i < numberOfBuckets; ++i) {
         // using integer key generates similar result
   //      final Integer key = i;
         final String key = "date=20210119/bucket=" + i;
         final int assignedTask = KeyGroupRangeAssignment.assignKeyToParallelOperator(
             key, maxParallelism, numberOfTasks);
         assignment[assignedTask] += 1;
       }
   
       final Map<Integer, Long> assignmentStats = Lists.newArrayList(assignment).stream()
           .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
       System.out.println(assignmentStats);
     }
   ```
   
   To avoid this problem, we have to write a custom partitioner that directly map bucket id to the subtask. The custom partitioner can be passed into `DataStream#partitionCustom()`.
   ```java
     static class BucketPartitioner implements Partitioner<Integer> {
   
       @Override
       public int partition(Integer key, int numPartitions) {
         return key % numPartitions;
       }
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559018257



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I think we're all in agreement to consider behavior across engines and consider settings that can be used anywhere. One of the major differences between batch and streaming is that it would be expensive and unusual to sort the records processed by a task, so actually producing records sorted locally within a task isn't something we should expect. My guess is that Flink won't support either local or global sort -- global is just task-level sort with a shared range partitioner.
   
   We do have a need to shuffle data to the write tasks in Flink. Shuffling by partitions is a good idea to start with because it covers the cases where hash partitioning is balanced. Skew in the partitions that @stevenzwu notes is a concern, but I'm also interested in handling skew in non-partition columns. If downstream reads are likely to filter by `country` in Steven's example, then clustering data by country in Flink is a good idea. Spark uses global ordering to handle this skew, which will estimate ranges of the sort keys and produce a partitioner config that balances the data.
   
   For Flink, what makes sense to me is to have options for the key-by operation: no key-by, key by `hash(partition)`, or key by `range(sort key)` -- where `range(sort key)` is determined using a method like Steven suggests. These distribution options correspond to the ones we plan to support in Spark, just without the task-level sort.
   
   I propose using a table property, `write.distribution.mode`, in both engines with values:
   * `none`: do not shuffle rows
   * `partition`: shuffle rows by `hash(partition)`
   * `sort`: shuffle rows by `range(sort key)` using a ranges provided by a skew calculation or from table metrics
   
   I considered a similar mode for sort, but Flink will probably ignore it and in Spark we are making it implicit: if the table's sort order is defined, then request a sort. So there is no need for an equivalent sort mode property.
   
   If we go with this, then we have one setting that works in both Flink and Spark. In Flink the property controls the `keyBy` behavior only, and in Spark it is combined with sort order to give the options that we've discussed:
   
   | Spark | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | hash distribute by partition key | range distribute by partition key |
   | ordered | no distribution, locally sorted | hash distribute by partition key, locally sorted | globally sorted |
   
   I think Flink would use a different sort key depending on whether the table order is set: it would range distribute by partition key or sort key. Here's an equivalent table:
   
   | Flink | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | hash distribute by partition key | range distribute by partition key |
   | ordered | no distribution or ordering | hash distribute by partition key | range distribute by sort key |
   
   In the short term, I expect Flink to only support `none` and `partition` modes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560387985



##########
File path: api/src/main/java/org/apache/iceberg/DistributionMode.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash-partition: hash distribute by partition keys, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range-partition: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is
+ * suitable for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash-partition"), RANGE("range-partition");

Review comment:
       "Partition" and "sort" aren't very clear to me. Both "hash partition" and "range partition" are "partitions". But in the table, they are listed as "partition" and "sort".
   
   If Flink and Spark are going to support different behaviors. Maybe it is better for them to define different values to be more accurately describe the behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560650931



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,71 @@ public Builder equalityFieldColumns(List<String> columns) {
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
-                                                         List<Integer> equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+    private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+                                                     Map<String, String> properties,
+                                                     PartitionSpec partitionSpec,
+                                                     Schema iSchema,
+                                                     RowType flinkRowType) {
+      DistributionMode writeMode;
+      if (distributionMode == null) {
+        // Fallback to use distribution mode parsed from table properties if don't specify in job level.
+        String modeName = PropertyUtil.propertyAsString(properties,
+            WRITE_DISTRIBUTION_MODE,
+            WRITE_DISTRIBUTION_MODE_DEFAULT);
+
+        writeMode = DistributionMode.fromName(modeName);
+      } else {
+        writeMode = distributionMode;
+      }
+
+      switch (writeMode) {
+        case NONE:
+          return input;
+
+        case HASH:
+          if (partitionSpec.isUnpartitioned()) {
+            return input;
+          } else {
+            return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+          }
 
-    RowType flinkSchema;
+        case RANGE:
+          throw new UnsupportedOperationException("The write.distribution-mode=range is not supported in flink now");

Review comment:
       There are two cases: 
   Case.1 :   people configure the distribution-mode in job-level to `RANGE`,  as we don't support it now so we'd better to throw `UnsupportedOperationException`  now; 
   
   Case. 2:   people change an existing table's properties from `NONE` to `RANGE`,  then all running flink jobs wont' be affected unless restarting,  the newly started flink job will be required to use `NONE` or `HASH`.  It's not friendly to break all existing jobs when restarting,  let me add a warn log and just keep the default `NONE` behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] electrum commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
electrum commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r561367698



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       @rdblue thanks for the pointer. Here are my thoughts on how this would work for Trino (formerly Presto SQL).
   
   Trino does streaming execution between stages -- there is no materialized shuffle phase. This means that global sorting would only be possible using a fixed range, not based on statistics, so it would be vulnerable to skew. I'd like to understand the use case for global "sort" compared to "partition".
   
   For local sorting, I see two choices:
   
   1. Write arbitrarily large files. Use a fixed size in-memory buffer, sort when full, write to temporary file, then merge files at end. There may be multiple merge passes in order to limit the number of files read at once during the merge. This is what we do for Hive bucketed-sorted tables, since sorting per bucket is required.
   2. Write multiple size-limited files. Use a fixed size in-memory buffer, sort when full, write final output file. Repeat until all input data for writer has been consumed.
   
   I would prefer the second option as it is simpler and uses fewer resources. It satisfies the property that each file is sorted and helps with compression and within-file filtering. The downside is that there are more files, but if they are of sufficient size, it shouldn't affect reads as we split files anyway when reading.
   
   Another option is to sort data using a fixed size buffer before writing each batch of rows. This would help with compression and within-file filtering, but wouldn't provide a guarantee on sorting for readers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#issuecomment-763787609


   Looks great, thanks for working on this @openinx!
   
   And thanks to everyone that helped discuss the configuration!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560563486



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,71 @@ public Builder equalityFieldColumns(List<String> columns) {
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
-                                                         List<Integer> equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+    private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+                                                     Map<String, String> properties,
+                                                     PartitionSpec partitionSpec,
+                                                     Schema iSchema,
+                                                     RowType flinkRowType) {
+      DistributionMode writeMode;
+      if (distributionMode == null) {
+        // Fallback to use distribution mode parsed from table properties if don't specify in job level.
+        String modeName = PropertyUtil.propertyAsString(properties,
+            WRITE_DISTRIBUTION_MODE,
+            WRITE_DISTRIBUTION_MODE_DEFAULT);
+
+        writeMode = DistributionMode.fromName(modeName);
+      } else {
+        writeMode = distributionMode;
+      }
+
+      switch (writeMode) {
+        case NONE:
+          return input;
+
+        case HASH:
+          if (partitionSpec.isUnpartitioned()) {
+            return input;
+          } else {
+            return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));

Review comment:
       that is fair




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559716019



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -209,7 +224,16 @@ public Builder equalityFieldColumns(List<String> columns) {
         }
       }
 
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema, equalityFieldIds);
+      // Convert the flink requested table schema to flink row type.
+      RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+
+      // Shuffle by partition key if possible.
+      if (shouldShuffleByPartition(table.properties()) && !table.spec().isUnpartitioned()) {
+        rowDataInput = rowDataInput.keyBy(new PartitionKeySelector(table.spec(), table.schema(), flinkRowType));

Review comment:
       Hashing over very large cardinality (like `user_id`) is fine. But I should caution about `keyBy` over partition key which has relatively small cardinality (like `bucket(user_id)`). It can get pretty uneven distribution. 
   ```
   {0=19, 1=62, 2=169, 3=282, 4=364, 5=318, 6=317, 7=207, 8=131, 9=60, 10=37, 11=15, 12=14, 13=4, 14=1}
   ```
   
   Here is the sample code.
   ```java
     @Test
     public void testKeyBy() {
       final int maxParallelism = 4000;
       final int numberOfTasks = 2000;
       final int numberOfBuckets = 10000;
   
       final Integer[] assignment = new Integer[2000];
       for (int i = 0; i < numberOfTasks; ++i) {
         assignment[i] = 0;
       }
   
       for (int i = 0; i < numberOfBuckets; ++i) {
         final String key = "date=20210119/bucket=" + i;
         final int assignedTask = KeyGroupRangeAssignment.assignKeyToParallelOperator(
             key, maxParallelism, numberOfTasks);
         assignment[assignedTask] += 1;
       }
   
       final Map<Integer, Long> assignmentStats = Lists.newArrayList(assignment).stream()
           .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
       System.out.println(assignmentStats);
     }
   ```
   
   To avoid this problem, we have to write a custom partitioner that directly map bucket id to the subtask and pass it to `DataStream#partitionCustom()`.
   ```java
     static class BucketPartitioner implements Partitioner<Integer> {
   
       @Override
       public int partition(Integer key, int numPartitions) {
         return key % numPartitions;
       }
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r557109188



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       > Otherwise, heavy data skew can be problematic for writer.
   
   @stevenzwu  We've considered this data skew issue. In my mind,  it's recommended to define bucket in table's PartitionSpec, for example: 
   
   ```java
       PartitionSpec spec = PartitionSpec.builderFor(table.schema())
           .day("ts")
           .bucket("id", 16)
           .build();
   ```
   
   Then the currently key-by method  (The [partitionKey#toPath](https://github.com/apache/iceberg/pull/2064/files#diff-0fa7d66fbfe363dd2992c26a69e3f29b631533fe1c7ab549e83c1f3f0d49153dR62) is actually the ".../ts_day=2020-01-01/bucket-14/..." ) will dispatch the input records into different bucket randomly.   I mean if people defines their buckets under partition path,  then we don't have to introduce such a complex dispatch-policy ( collecting the partition's statistic and dispatch records based on a weight value ).    Does that make sense for you ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559018257



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I think we're all in agreement to consider behavior across engines and consider settings that can be used anywhere. One of the major differences between batch and streaming is that it would be expensive and unusual to sort the records processed by a task, so actually producing records sorted locally within a task isn't something we should expect. My guess is that Flink won't support either local or global sort -- global is just task-level sort with a shared range partitioner.
   
   We do have a need to shuffle data to the write tasks in Flink. Shuffling by partitions is a good idea to start with because it covers the cases where hash partitioning is balanced. Skew in the partitions that @stevenzwu notes is a concern, but I'm also interested in handling skew in non-partition columns. If downstream reads are likely to filter by `country` in Steven's example, then clustering data by country in Flink is a good idea even if it isn't a partition column. Spark uses global ordering to handle this skew, which will estimate ranges of the sort keys and produce a partitioner config that balances the data.
   
   For Flink, what makes sense to me is to have options for the key-by operation: no key-by, key by `hash(partition)`, or key by `range(sort key)` -- where `range(sort key)` is determined using a method like Steven suggests. These distribution options correspond to the ones we plan to support in Spark, just without the task-level sort.
   
   I propose using a table property, `write.distribution-mode`, in both engines with values:
   * `none`: do not shuffle rows
   * `partition`: shuffle rows by `hash(partition)`
   * `sort`: shuffle rows by `range(sort key)` using a ranges provided by a skew calculation or from table metrics
   
   I considered a similar mode for sort, but Flink will probably ignore it and in Spark we are making it implicit: if the table's sort order is defined, then request a sort. So there is no need for an equivalent sort mode property.
   
   If we go with this, then we have one setting that works in both Flink and Spark. In Flink the property controls the `keyBy` behavior only, and in Spark it is combined with sort order to give the options that we've discussed:
   
   | Spark | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | hash distribute by partition key | range distribute by partition key |
   | ordered | no distribution, locally sorted | hash distribute by partition key, locally sorted | globally sorted |
   
   I think Flink would use a different sort key depending on whether the table order is set: it would range distribute by partition key or sort key. Here's an equivalent table:
   
   | Flink | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | hash distribute by partition key | range distribute by partition key |
   | ordered | no distribution or ordering | hash distribute by partition key | range distribute by sort key |
   
   In the short term, I expect Flink to only support `none` and `partition` modes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559714619



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       Hashing over very large cardinality (like `user_id`). I also like to caution about `keyBy` over partition key which has relatively small cardinality (like `bucket(user_id)`). It can get pretty uneven distribution. 
   ```
   {0=19, 1=62, 2=169, 3=282, 4=364, 5=318, 6=317, 7=207, 8=131, 9=60, 10=37, 11=15, 12=14, 13=4, 14=1}
   ```
   
   Here is the sample code.
   ```java
     @Test
     public void testKeyBy() {
       final int maxParallelism = 4000;
       final int numberOfTasks = 2000;
       final int numberOfBuckets = 10000;
   
       final Integer[] assignment = new Integer[2000];
       for (int i = 0; i < numberOfTasks; ++i) {
         assignment[i] = 0;
       }
   
       for (int i = 0; i < numberOfBuckets; ++i) {
         final String key = "date=20210119/bucket=" + i;
         final int assignedTask = KeyGroupRangeAssignment.assignKeyToParallelOperator(
             key, maxParallelism, numberOfTasks);
         assignment[assignedTask] += 1;
       }
   
       final Map<Integer, Long> assignmentStats = Lists.newArrayList(assignment).stream()
           .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
       System.out.println(assignmentStats);
     }
   ```
   
   We have to write a custom partitioner that directly map bucket id to the subtask
   ```java
     static class BucketPartitioner implements Partitioner<Integer> {
   
       @Override
       public int partition(Integer key, int numPartitions) {
         return key % numPartitions;
       }
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560512464



##########
File path: api/src/main/java/org/apache/iceberg/DistributionMode.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash-partition: hash distribute by partition keys, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range-partition: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is
+ * suitable for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash-partition"), RANGE("range-partition");

Review comment:
       Sounds good to me. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r562160730



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       @electrum, as far as what a "local sort" means, I think option 2 sounds good to me for a task-level sort. If that sort is needlessly expensive, then it is okay for Trino to skip it. But I think that if a table has a defined sort order, the right thing would be for Trino to apply it.
   
   For data distribution, it sounds like Trino will only support `none` and `hash` modes in the short term. That's reasonable given that you can't stage data and use it twice. Even with shuffle data reuse, global sort in Spark is quite expensive in some cases (doing a large join twice, for example). Eventually, we want to get to where the table metadata has a sketch of the data distribution so you can use that to get ranges for a global ordering.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559018257



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       I think we're all in agreement to consider behavior across engines and consider settings that can be used anywhere. One of the major differences between batch and streaming is that it would be expensive and unusual to sort the records processed by a task, so actually producing records sorted locally within a task isn't something we should expect. My guess is that Flink won't support either local or global sort -- global is just task-level sort with a shared range partitioner.
   
   We do have a need to shuffle data to the write tasks in Flink. Shuffling by partitions is a good idea to start with because it covers the cases where hash partitioning is balanced. Skew in the partitions that @stevenzwu notes is a concern, but I'm also interested in handling skew in non-partition columns. If downstream reads are likely to filter by `country` in Steven's example, then clustering data by country in Flink is a good idea. Spark uses global ordering to handle this skew, which will estimate ranges of the sort keys and produce a partitioner config that balances the data.
   
   For Flink, what makes sense to me is to have options for the key-by operation: no key-by, key by `hash(partition)`, or key by `range(sort key)` -- where `range(sort key)` is determined using a method like Steven suggests. These distribution options correspond to the ones we plan to support in Spark, just without the task-level sort.
   
   I propose using a table property, `write.distribution.mode`, in both engines with values:
   * `none`: do not shuffle rows
   * `partition`: shuffle rows by `hash(partition)`
   * `sort`: shuffle rows by `range(sort key)` using a ranges provided by a skew calculation or from table metrics
   
   I considered a similar mode for sort, but Flink will probably ignore it and in Spark we are making it implicit: if the table's sort order is defined, then request a sort. So there is no need for an equivalent sort mode property.
   
   If we go with this, then we have one setting that works in both Flink and Spark. In Flink the property controls the `keyBy` behavior only, and in Spark it is combined with sort order to give the options that we've discussed:
   
   | Spark | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | distribute by partition key | range distribute by partition key |
   | ordered | no distribution, locally sorted | distribute by partition key, locally sorted | globally sorted |
   
   I think Flink would use a different sort key depending on whether the table order is set: it would range distribute by partition key or sort key. Here's an equivalent table:
   
   | Flink | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution or ordering | distribute by partition key | range distribute by partition key |
   | ordered | no distribution or ordering | distribute by partition key | range distribute by sort key |




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560392166



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,71 @@ public Builder equalityFieldColumns(List<String> columns) {
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
-                                                         List<Integer> equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+    private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+                                                     Map<String, String> properties,
+                                                     PartitionSpec partitionSpec,
+                                                     Schema iSchema,
+                                                     RowType flinkRowType) {
+      DistributionMode writeMode;
+      if (distributionMode == null) {
+        // Fallback to use distribution mode parsed from table properties if don't specify in job level.
+        String modeName = PropertyUtil.propertyAsString(properties,
+            WRITE_DISTRIBUTION_MODE,
+            WRITE_DISTRIBUTION_MODE_DEFAULT);
+
+        writeMode = DistributionMode.fromName(modeName);
+      } else {
+        writeMode = distributionMode;
+      }
+
+      switch (writeMode) {
+        case NONE:
+          return input;
+
+        case HASH:
+          if (partitionSpec.isUnpartitioned()) {
+            return input;
+          } else {
+            return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));

Review comment:
       I still have concerns on supporting this hash partition of `keyBy` the partition key due to the data skew problem that mentioned in the discussion thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560006175



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       @rdblue  I like the table you provided,  I have few questions : For an iceberg table which has defined its __SortOder__  columns,   the spark job will write the sorted records (based on sort keys) into parquet files,  should the flink job also write the sorted records into parquet files ?  Should we keep the same semantic of __SortOrder__  among different engines  although it's not cheap to accomplish the goal ? (  I raise this question because I saw the __Flink__ table  does not require locally sorted or global sorted )  
   
   Or the definition of __SortOrder__  is to define the write behavior while don't define the read behavior that means the records read from parquet file don't have to be sorted by the sort-keys ? Seems like defining write behavior is the behavior you guys try to accomplish from my understanding based the above table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559076846



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       @stevenzwu @openinx, what do you think about Ryan's proposal?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560619616



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_DISTRIBUTION_MODE = "write.distribution-mode";
+  public static final String WRITE_DISTRIBUTION_MODE_DEFAULT = "none";

Review comment:
       define the default value directly as the enum?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r557116059



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
   public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
   public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 
+  public static final String WRITE_SHUFFLE_BY_PARTITION = "write.shuffle-by.partition";

Review comment:
       > The other problem is that effective writer parallelism now is limited by the number of partition values. 
   
   Yes , it's true that writer parallelism will be limited by the number of partition/bucket values.  Currently,  seems like we could only create enough buckets so that the writer parallelism won't be so small.   For example,  we could create 64 buckets for a partition,  that means the ideally writer parallelism should be set as 64.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560374053



##########
File path: api/src/main/java/org/apache/iceberg/DistributionMode.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash-partition: hash distribute by partition keys, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range-partition: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is
+ * suitable for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash-partition"), RANGE("range-partition");
+
+  private final String name;
+
+  DistributionMode(String mode) {
+    this.name = mode;
+  }
+
+  public String modeName() {
+    return name;
+  }
+
+  public static DistributionMode fromName(String name) {

Review comment:
       If we used `hash` and `range`, then this would just need to use `valueOf(name.toUpperCase(Locale.ROOT))` (with a null check, of course).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2064: Flink: Support write.distribution-mode.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560449935



##########
File path: api/src/main/java/org/apache/iceberg/DistributionMode.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Enum of supported write distribution mode, it defines the write behavior of batch or streaming job:
+ * <p>
+ * 1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few
+ * partitions, otherwise that may produce too many small files because each task is writing rows into different
+ * partitions randomly.
+ * <p>
+ * 2. hash-partition: hash distribute by partition keys, which is suitable for the scenarios where the rows are located
+ * into different partitions evenly.
+ * <p>
+ * 3. range-partition: range distribute by partition key (or sort key if table has an {@link SortOrder}), which is
+ * suitable for the scenarios where rows are located into different partitions with skew distribution.
+ */
+public enum DistributionMode {
+  NONE("none"), HASH("hash-partition"), RANGE("range-partition");

Review comment:
       +1 on not using the term "partition" when talking about distribution.
   
   W.r.t. naming, I did ask myself the question what should be the best names here. I tend to like "hash" and "range" a bit more as it may not be clear that `partition` refers to the table's partition spec.
   
   I guess the real question here is what does this table control? Are we allowing to control whether to use hash or range distribution or do we control whether the distribution is based on the partition spec or sort order?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org