You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/04/12 16:01:05 UTC

[iceberg] 02/02: Flink 1.17: Port Expose write-parallelism in SQL Hints to 1.17

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 09a1efc0c03d64a4c178587cab479ca8fe4f32ac
Author: Liwei Li <hi...@gmail.com>
AuthorDate: Wed Apr 12 10:50:15 2023 +0800

    Flink 1.17: Port Expose write-parallelism in SQL Hints to 1.17
---
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |  4 +++
 .../apache/iceberg/flink/FlinkWriteOptions.java    |  3 ++
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  9 +++--
 .../apache/iceberg/flink/TestFlinkTableSink.java   | 40 ++++++++++++++++++++++
 4 files changed, 53 insertions(+), 3 deletions(-)

diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 4b5c7e4a0d..aba23389f2 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -180,4 +180,8 @@ public class FlinkWriteConf {
         .defaultValue(FlinkWriteOptions.BRANCH.defaultValue())
         .parse();
   }
+
+  public Integer writeParallelism() {
+    return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
+  }
 }
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index 86cb2fb0eb..ba0931318e 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -61,4 +61,7 @@ public class FlinkWriteOptions {
   // Branch to write to
   public static final ConfigOption<String> BRANCH =
       ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
+
+  public static final ConfigOption<Integer> WRITE_PARALLELISM =
+      ConfigOptions.key("write-parallelism").intType().noDefaultValue();
 }
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 445b6a6ff9..46a229d4c8 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -132,7 +132,6 @@ public class FlinkSink {
     private TableLoader tableLoader;
     private Table table;
     private TableSchema tableSchema;
-    private Integer writeParallelism = null;
     private List<String> equalityFieldColumns = null;
     private String uidPrefix = null;
     private final Map<String, String> snapshotProperties = Maps.newHashMap();
@@ -248,7 +247,8 @@ public class FlinkSink {
      * @return {@link Builder} to connect the iceberg table.
      */
     public Builder writeParallelism(int newWriteParallelism) {
-      this.writeParallelism = newWriteParallelism;
+      writeOptions.put(
+          FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
       return this;
     }
 
@@ -464,7 +464,10 @@ public class FlinkSink {
       IcebergStreamWriter<RowData> streamWriter =
           createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds);
 
-      int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
+      int parallelism =
+          flinkWriteConf.writeParallelism() == null
+              ? input.getParallelism()
+              : flinkWriteConf.writeParallelism();
       SingleOutputStreamOperator<WriteResult> writerStream =
           input
               .transform(
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index c4c75edd9e..7540627989 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -18,15 +18,20 @@
  */
 package org.apache.iceberg.flink;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Expressions;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
@@ -178,6 +183,41 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
+    String insertSQL =
+        String.format(
+            "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
+            TABLE_NAME, SOURCE_TABLE);
+    ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
+    Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
+    Transformation<?> committer = dummySink.getInputs().get(0);
+    Transformation<?> writer = committer.getInputs().get(0);
+
+    Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism());
+
+    writer
+        .getInputs()
+        .forEach(
+            input ->
+                Assert.assertEquals(
+                    "Should have the expected parallelism.",
+                    isStreamingJob ? 2 : 4,
+                    input.getParallelism()));
+  }
+
   @Test
   public void testReplacePartitions() throws Exception {
     Assume.assumeFalse(