You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "hililiwei (via GitHub)" <gi...@apache.org> on 2023/04/11 07:29:36 UTC

[GitHub] [iceberg] hililiwei commented on a diff in pull request #7039: Flink: Expose write-parallelism in SQL Hints

hililiwei commented on code in PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#discussion_r1162407139


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);

Review Comment:
   modified.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);
+
+      PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
+      String insertSQL =
+          String.format(
+              "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
+              tableName, SOURCE_TABLE);
+      ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
+      Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
+      Transformation<?> committer = dummySink.getInputs().get(0);
+      Transformation<?> writer = committer.getInputs().get(0);
+
+      Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism());
+
+      writer

Review Comment:
   The reason `get(0)` is not used here is to express that all upstream sources will be the default parallelism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org