You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/10/21 01:09:49 UTC

[iceberg] branch master updated: Flink: Support configurable parallelism for write tasks (#1619)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d3ec643  Flink: Support configurable parallelism for write tasks (#1619)
d3ec643 is described below

commit d3ec64385b2ca6687167fa09a7e768e6da7c61df
Author: openinx <op...@gmail.com>
AuthorDate: Wed Oct 21 09:09:35 2020 +0800

    Flink: Support configurable parallelism for write tasks (#1619)
---
 .../java/org/apache/iceberg/flink/sink/FlinkSink.java    | 16 +++++++++++++++-
 .../apache/iceberg/flink/sink/TestFlinkIcebergSink.java  |  2 ++
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 7ff59d2..7f1d9ca 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -111,6 +111,7 @@ public class FlinkSink {
     private Table table;
     private TableSchema tableSchema;
     private boolean overwrite = false;
+    private Integer writeParallelism = null;
 
     private Builder() {
     }
@@ -156,6 +157,17 @@ public class FlinkSink {
       return this;
     }
 
+    /**
+     * Configuring the write parallel number for iceberg stream writer.
+     *
+     * @param newWriteParallelism the number of parallel iceberg stream writer.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder writeParallelism(int newWriteParallelism) {
+      this.writeParallelism = newWriteParallelism;
+      return this;
+    }
+
     @SuppressWarnings("unchecked")
     public DataStreamSink<RowData> build() {
       Preconditions.checkArgument(rowDataInput != null,
@@ -174,9 +186,11 @@ public class FlinkSink {
       IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, tableSchema);
       IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
 
+      this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+
       DataStream<Void> returnStream = rowDataInput
           .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter)
-          .setParallelism(rowDataInput.getParallelism())
+          .setParallelism(writeParallelism)
           .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
           .setParallelism(1)
           .setMaxParallelism(1);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 18aae28..0022425 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -131,6 +131,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
     FlinkSink.forRowData(dataStream)
         .table(table)
         .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
         .build();
 
     // Execute the program.
@@ -153,6 +154,7 @@ public class TestFlinkIcebergSink extends AbstractTestBase {
         .table(table)
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
+        .writeParallelism(parallelism)
         .build();
 
     // Execute the program.