You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/10/21 01:09:49 UTC
[iceberg] branch master updated: Flink: Support configurable
parallelism for write tasks (#1619)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d3ec643 Flink: Support configurable parallelism for write tasks (#1619)
d3ec643 is described below
commit d3ec64385b2ca6687167fa09a7e768e6da7c61df
Author: openinx <op...@gmail.com>
AuthorDate: Wed Oct 21 09:09:35 2020 +0800
Flink: Support configurable parallelism for write tasks (#1619)
---
.../java/org/apache/iceberg/flink/sink/FlinkSink.java | 16 +++++++++++++++-
.../apache/iceberg/flink/sink/TestFlinkIcebergSink.java | 2 ++
2 files changed, 17 insertions(+), 1 deletion(-)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 7ff59d2..7f1d9ca 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -111,6 +111,7 @@ public class FlinkSink {
private Table table;
private TableSchema tableSchema;
private boolean overwrite = false;
+ private Integer writeParallelism = null;
private Builder() {
}
@@ -156,6 +157,17 @@ public class FlinkSink {
return this;
}
+ /**
+ * Configuring the write parallel number for iceberg stream writer.
+ *
+ * @param newWriteParallelism the number of parallel iceberg stream writer.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public Builder writeParallelism(int newWriteParallelism) {
+ this.writeParallelism = newWriteParallelism;
+ return this;
+ }
+
@SuppressWarnings("unchecked")
public DataStreamSink<RowData> build() {
Preconditions.checkArgument(rowDataInput != null,
@@ -174,9 +186,11 @@ public class FlinkSink {
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema);
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+ this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+
DataStream<Void> returnStream = rowDataInput
.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter)
- .setParallelism(rowDataInput.getParallelism())
+ .setParallelism(writeParallelism)
.transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
.setParallelism(1)
.setMaxParallelism(1);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 18aae28..0022425 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -131,6 +131,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
FlinkSink.forRowData(dataStream)
.table(table)
.tableLoader(tableLoader)
+ .writeParallelism(parallelism)
.build();
// Execute the program.
@@ -153,6 +154,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
.table(table)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
+ .writeParallelism(parallelism)
.build();
// Execute the program.