You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/04/12 16:01:03 UTC

[iceberg] branch master updated (df31406b3d -> 09a1efc0c0)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


    from df31406b3d Build: Bump Nessie from 0.56.0 to 0.57.0 (#7323)
     new 325d282766 Flink 1.15: Port Expose write-parallelism in SQL Hints to 1.15
     new 09a1efc0c0 Flink 1.17: Port Expose write-parallelism in SQL Hints to 1.17

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |  4 +++
 .../apache/iceberg/flink/FlinkWriteOptions.java    |  3 ++
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  9 +++--
 .../apache/iceberg/flink/TestFlinkTableSink.java   | 40 ++++++++++++++++++++++
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |  4 +++
 .../apache/iceberg/flink/FlinkWriteOptions.java    |  3 ++
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  9 +++--
 .../apache/iceberg/flink/TestFlinkTableSink.java   | 40 ++++++++++++++++++++++
 8 files changed, 106 insertions(+), 6 deletions(-)


[iceberg] 01/02: Flink 1.15: Port Expose write-parallelism in SQL Hints to 1.15

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 325d282766a7151b68111191a92ea91fbac02f88
Author: Liwei Li <hi...@gmail.com>
AuthorDate: Wed Apr 12 10:50:08 2023 +0800

    Flink 1.15: Port Expose write-parallelism in SQL Hints to 1.15
---
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |  4 +++
 .../apache/iceberg/flink/FlinkWriteOptions.java    |  3 ++
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  9 +++--
 .../apache/iceberg/flink/TestFlinkTableSink.java   | 40 ++++++++++++++++++++++
 4 files changed, 53 insertions(+), 3 deletions(-)

diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 4b5c7e4a0d..aba23389f2 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -180,4 +180,8 @@ public class FlinkWriteConf {
         .defaultValue(FlinkWriteOptions.BRANCH.defaultValue())
         .parse();
   }
+
+  public Integer writeParallelism() {
+    return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
+  }
 }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index 86cb2fb0eb..ba0931318e 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -61,4 +61,7 @@ public class FlinkWriteOptions {
   // Branch to write to
   public static final ConfigOption<String> BRANCH =
       ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
+
+  public static final ConfigOption<Integer> WRITE_PARALLELISM =
+      ConfigOptions.key("write-parallelism").intType().noDefaultValue();
 }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 445b6a6ff9..46a229d4c8 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -132,7 +132,6 @@ public class FlinkSink {
     private TableLoader tableLoader;
     private Table table;
     private TableSchema tableSchema;
-    private Integer writeParallelism = null;
     private List<String> equalityFieldColumns = null;
     private String uidPrefix = null;
     private final Map<String, String> snapshotProperties = Maps.newHashMap();
@@ -248,7 +247,8 @@ public class FlinkSink {
      * @return {@link Builder} to connect the iceberg table.
      */
     public Builder writeParallelism(int newWriteParallelism) {
-      this.writeParallelism = newWriteParallelism;
+      writeOptions.put(
+          FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
       return this;
     }
 
@@ -464,7 +464,10 @@ public class FlinkSink {
       IcebergStreamWriter<RowData> streamWriter =
           createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds);
 
-      int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
+      int parallelism =
+          flinkWriteConf.writeParallelism() == null
+              ? input.getParallelism()
+              : flinkWriteConf.writeParallelism();
       SingleOutputStreamOperator<WriteResult> writerStream =
           input
               .transform(
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index c4c75edd9e..7540627989 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -18,15 +18,20 @@
  */
 package org.apache.iceberg.flink;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Expressions;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
@@ -178,6 +183,41 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
+    String insertSQL =
+        String.format(
+            "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
+            TABLE_NAME, SOURCE_TABLE);
+    ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
+    Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
+    Transformation<?> committer = dummySink.getInputs().get(0);
+    Transformation<?> writer = committer.getInputs().get(0);
+
+    Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism());
+
+    writer
+        .getInputs()
+        .forEach(
+            input ->
+                Assert.assertEquals(
+                    "Should have the expected parallelism.",
+                    isStreamingJob ? 2 : 4,
+                    input.getParallelism()));
+  }
+
   @Test
   public void testReplacePartitions() throws Exception {
     Assume.assumeFalse(


[iceberg] 02/02: Flink 1.17: Port Expose write-parallelism in SQL Hints to 1.17

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 09a1efc0c03d64a4c178587cab479ca8fe4f32ac
Author: Liwei Li <hi...@gmail.com>
AuthorDate: Wed Apr 12 10:50:15 2023 +0800

    Flink 1.17: Port Expose write-parallelism in SQL Hints to 1.17
---
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |  4 +++
 .../apache/iceberg/flink/FlinkWriteOptions.java    |  3 ++
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  9 +++--
 .../apache/iceberg/flink/TestFlinkTableSink.java   | 40 ++++++++++++++++++++++
 4 files changed, 53 insertions(+), 3 deletions(-)

diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 4b5c7e4a0d..aba23389f2 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -180,4 +180,8 @@ public class FlinkWriteConf {
         .defaultValue(FlinkWriteOptions.BRANCH.defaultValue())
         .parse();
   }
+
+  public Integer writeParallelism() {
+    return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
+  }
 }
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index 86cb2fb0eb..ba0931318e 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -61,4 +61,7 @@ public class FlinkWriteOptions {
   // Branch to write to
   public static final ConfigOption<String> BRANCH =
       ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
+
+  public static final ConfigOption<Integer> WRITE_PARALLELISM =
+      ConfigOptions.key("write-parallelism").intType().noDefaultValue();
 }
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 445b6a6ff9..46a229d4c8 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -132,7 +132,6 @@ public class FlinkSink {
     private TableLoader tableLoader;
     private Table table;
     private TableSchema tableSchema;
-    private Integer writeParallelism = null;
     private List<String> equalityFieldColumns = null;
     private String uidPrefix = null;
     private final Map<String, String> snapshotProperties = Maps.newHashMap();
@@ -248,7 +247,8 @@ public class FlinkSink {
      * @return {@link Builder} to connect the iceberg table.
      */
     public Builder writeParallelism(int newWriteParallelism) {
-      this.writeParallelism = newWriteParallelism;
+      writeOptions.put(
+          FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
       return this;
     }
 
@@ -464,7 +464,10 @@ public class FlinkSink {
       IcebergStreamWriter<RowData> streamWriter =
           createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds);
 
-      int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
+      int parallelism =
+          flinkWriteConf.writeParallelism() == null
+              ? input.getParallelism()
+              : flinkWriteConf.writeParallelism();
       SingleOutputStreamOperator<WriteResult> writerStream =
           input
               .transform(
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index c4c75edd9e..7540627989 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -18,15 +18,20 @@
  */
 package org.apache.iceberg.flink;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Expressions;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
@@ -178,6 +183,41 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
+    String insertSQL =
+        String.format(
+            "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
+            TABLE_NAME, SOURCE_TABLE);
+    ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
+    Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
+    Transformation<?> committer = dummySink.getInputs().get(0);
+    Transformation<?> writer = committer.getInputs().get(0);
+
+    Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism());
+
+    writer
+        .getInputs()
+        .forEach(
+            input ->
+                Assert.assertEquals(
+                    "Should have the expected parallelism.",
+                    isStreamingJob ? 2 : 4,
+                    input.getParallelism()));
+  }
+
   @Test
   public void testReplacePartitions() throws Exception {
     Assume.assumeFalse(