You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "stevenzwu (via GitHub)" <gi...@apache.org> on 2023/04/10 01:48:54 UTC

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7039: Flink: Expose write-parallelism in SQL Hints

stevenzwu commented on code in PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#discussion_r1161374565


##########
docs/flink-configuration.md:
##########
@@ -148,13 +148,14 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
 ...
 ```
 
-| Flink option           | Default                                    | Description                                                  |
-| ---------------------- | ------------------------------------------ | ------------------------------------------------------------ |
-| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc |
-| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes          |
-| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                  |
-| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
-| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode               |
-| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write      |
-| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
-| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write |
+| Flink option           | Default                                                              | Description                                                                                                                        |
+|------------------------|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default                                           | File format to use for this write operation; parquet, avro, or orc                                                                 |
+| target-file-size-bytes | As per table property                                                | Overrides this table's write.target-file-size-bytes                                                                                |
+| upsert-enabled         | Table write.upsert.enabled                                           | Overrides this table's write.upsert.enabled                                                                                        |
+| overwrite-enabled      | false                                                                | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream.                         |
+| distribution-mode      | Table write.distribution-mode                                        | Overrides this table's write.distribution-mode                                                                                     |
+| compression-codec      | Table write.(fileformat).compression-codec                           | Overrides this table's compression codec for this write                                                                            |
+| compression-level      | Table write.(fileformat).compression-level                           | Overrides this table's compression level for Parquet and Avro tables for this write                                                |
+| compression-strategy   | Table write.orc.compression-strategy                                 | Overrides this table's compression strategy for ORC tables for this write                                                          |
+| write-parallelism      | The same as the parallel of the input operator to the write operator | Configuring the write parallel number for iceberg stream writer. By default, it is the same as the parallel of the input operator. |

Review Comment:
   The same as the parallel of the input operator to the write operator
   -->
   Upstream operator parallelism
   
   Configuring the write parallel number for iceberg stream writer. By default, it is the same as the parallel of the input operator.
   -->
   Overrides the writer parallelism



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);

Review Comment:
   why can't we reuse the table created in the `before` method?



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);
+
+      PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
+      String insertSQL =
+          String.format(
+              "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
+              tableName, SOURCE_TABLE);
+      ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
+      Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
+      Transformation<?> committer = dummySink.getInputs().get(0);
+      Transformation<?> writer = committer.getInputs().get(0);
+
+      Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism());
+
+      writer

Review Comment:
   why we are not doing `get(0)` for the upstream source operator?



##########
docs/flink-configuration.md:
##########
@@ -148,13 +148,14 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
 ...
 ```
 
-| Flink option           | Default                                    | Description                                                  |
-| ---------------------- | ------------------------------------------ | ------------------------------------------------------------ |
-| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc |
-| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes          |
-| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                  |
-| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
-| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode               |
-| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write      |
-| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
-| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write |
+| Flink option           | Default                                                              | Description                                                                                                                        |
+|------------------------|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default                                           | File format to use for this write operation; parquet, avro, or orc                                                                 |
+| target-file-size-bytes | As per table property                                                | Overrides this table's write.target-file-size-bytes                                                                                |
+| upsert-enabled         | Table write.upsert.enabled                                           | Overrides this table's write.upsert.enabled                                                                                        |
+| overwrite-enabled      | false                                                                | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream.                         |
+| distribution-mode      | Table write.distribution-mode                                        | Overrides this table's write.distribution-mode                                                                                     |
+| compression-codec      | Table write.(fileformat).compression-codec                           | Overrides this table's compression codec for this write                                                                            |
+| compression-level      | Table write.(fileformat).compression-level                           | Overrides this table's compression level for Parquet and Avro tables for this write                                                |
+| compression-strategy   | Table write.orc.compression-strategy                                 | Overrides this table's compression strategy for ORC tables for this write                                                          |
+| write-parallelism      | The same as the parallel of the input operator to the write operator | Configuring the write parallel number for iceberg stream writer. By default, it is the same as the parallel of the input operator. |

Review Comment:
   Then maybe we can revert the table width change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org