You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/11 10:06:17 UTC

[GitHub] [iceberg] kbendick commented on a change in pull request #2064: Flink: Add option to shuffle by partition key in iceberg sink.

kbendick commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r554935039



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
##########
@@ -220,4 +232,52 @@ public void testInsertIntoPartition() throws Exception {
 
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
   }
+
+  @Test
+  public void testWithShuffleByPartition() throws Exception {
+    String tableName = "test_shuffle_by_partition";
+
+    Map<String, String> tableProps = ImmutableMap.of(
+        "write.format.default", format.name(),
+        "write.shuffle-by.partition", "true"
+    );
+    sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
+        tableName, toWithClause(tableProps));
+
+    // Insert data set.
+    sql("INSERT INTO %s VALUES " +
+        "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
+        "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
+        "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
+
+    Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
+    SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
+        SimpleDataUtil.createRecord(1, "aaa"),
+        SimpleDataUtil.createRecord(1, "bbb"),
+        SimpleDataUtil.createRecord(1, "ccc"),
+        SimpleDataUtil.createRecord(2, "aaa"),
+        SimpleDataUtil.createRecord(2, "bbb"),
+        SimpleDataUtil.createRecord(2, "ccc"),
+        SimpleDataUtil.createRecord(3, "aaa"),
+        SimpleDataUtil.createRecord(3, "bbb"),
+        SimpleDataUtil.createRecord(3, "ccc")
+    ));
+
+    Assert.assertEquals("Should 1 data file in partition 'aaa'", 1, partitionFiles(tableName, "aaa").size());

Review comment:
       Small nit: Consider `There should be [only] 1 data file in partition 'aaa'`. And similarly for the other files.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,46 @@ public Builder equalityFieldColumns(List<String> columns) {
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema,
-                                                         List<Integer> equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+    private boolean shouldShuffleByPartition(Map<String, String> properties) {
+      if (shuffleByPartition == null) {
+        // Use the configured table option if does not specify in FlinkSink explicitly.
+        return PropertyUtil.propertyAsBoolean(properties,
+            WRITE_SHUFFLE_BY_PARTITION,
+            WRITE_SHUFFLE_BY_PARTITION_DEFAULT);
+      } else {
+        return shuffleByPartition;

Review comment:
       Somewhat unrelated question: Is it possible to set these values at the cluster level (like with Flink's properties), or is it only code property then table property only? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org