You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/04/12 16:01:04 UTC
[iceberg] 01/02: Flink 1.15: Port Expose write-parallelism in SQL Hints to 1.15
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 325d282766a7151b68111191a92ea91fbac02f88
Author: Liwei Li <hi...@gmail.com>
AuthorDate: Wed Apr 12 10:50:08 2023 +0800
Flink 1.15: Port Expose write-parallelism in SQL Hints to 1.15
---
.../org/apache/iceberg/flink/FlinkWriteConf.java | 4 +++
.../apache/iceberg/flink/FlinkWriteOptions.java | 3 ++
.../org/apache/iceberg/flink/sink/FlinkSink.java | 9 +++--
.../apache/iceberg/flink/TestFlinkTableSink.java | 40 ++++++++++++++++++++++
4 files changed, 53 insertions(+), 3 deletions(-)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 4b5c7e4a0d..aba23389f2 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -180,4 +180,8 @@ public class FlinkWriteConf {
.defaultValue(FlinkWriteOptions.BRANCH.defaultValue())
.parse();
}
+
+ public Integer writeParallelism() {
+ return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
+ }
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index 86cb2fb0eb..ba0931318e 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -61,4 +61,7 @@ public class FlinkWriteOptions {
// Branch to write to
public static final ConfigOption<String> BRANCH =
ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
+
+ public static final ConfigOption<Integer> WRITE_PARALLELISM =
+ ConfigOptions.key("write-parallelism").intType().noDefaultValue();
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 445b6a6ff9..46a229d4c8 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -132,7 +132,6 @@ public class FlinkSink {
private TableLoader tableLoader;
private Table table;
private TableSchema tableSchema;
- private Integer writeParallelism = null;
private List<String> equalityFieldColumns = null;
private String uidPrefix = null;
private final Map<String, String> snapshotProperties = Maps.newHashMap();
@@ -248,7 +247,8 @@ public class FlinkSink {
* @return {@link Builder} to connect the iceberg table.
*/
public Builder writeParallelism(int newWriteParallelism) {
- this.writeParallelism = newWriteParallelism;
+ writeOptions.put(
+ FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
return this;
}
@@ -464,7 +464,10 @@ public class FlinkSink {
IcebergStreamWriter<RowData> streamWriter =
createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds);
- int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
+ int parallelism =
+ flinkWriteConf.writeParallelism() == null
+ ? input.getParallelism()
+ : flinkWriteConf.writeParallelism();
SingleOutputStreamOperator<WriteResult> writerStream =
input
.transform(
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index c4c75edd9e..7540627989 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -18,15 +18,20 @@
*/
package org.apache.iceberg.flink;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
@@ -178,6 +183,41 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
}
+ @Test
+ public void testWriteParallelism() throws Exception {
+ List<Row> dataSet =
+ IntStream.range(1, 1000)
+ .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+ sql(
+ "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+ + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+ SOURCE_TABLE, dataId);
+
+ PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
+ String insertSQL =
+ String.format(
+ "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
+ TABLE_NAME, SOURCE_TABLE);
+ ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
+ Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
+ Transformation<?> committer = dummySink.getInputs().get(0);
+ Transformation<?> writer = committer.getInputs().get(0);
+
+ Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism());
+
+ writer
+ .getInputs()
+ .forEach(
+ input ->
+ Assert.assertEquals(
+ "Should have the expected parallelism.",
+ isStreamingJob ? 2 : 4,
+ input.getParallelism()));
+ }
+
@Test
public void testReplacePartitions() throws Exception {
Assume.assumeFalse(