You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "hililiwei (via GitHub)" <gi...@apache.org> on 2023/03/08 06:19:48 UTC

[GitHub] [iceberg] hililiwei opened a new pull request, #7039: Flink: Expose write-parallelism in SQL Hints

hililiwei opened a new pull request, #7039:
URL: https://github.com/apache/iceberg/pull/7039

   Co-authored-by: chidayong <24...@qq.com>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7039: Flink: Expose write-parallelism in SQL Hints

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#discussion_r1133156763


##########
docs/flink-getting-started.md:
##########
@@ -860,16 +860,17 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
 ...
 ```
 
-| Flink option           | Default                                    | Description                                                                                                |
-|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------|
-| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc                                         |
-| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes                                                        |
-| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                                                                |
-| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
-| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode                                                             |
-| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write                                                    |
-| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write                        |
-| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write                                  |
+| Flink option           | Default                                    | Description                                                                                                                        |
+|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc                                                                 |
+| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes                                                                                |
+| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                                                                                        |
+| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream.                         |
+| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode                                                                                     |
+| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write                                                                            |
+| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write                                                |
+| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write                                                          |
+| write-parallelism      | null                                       | Configuring the write parallel number for iceberg stream writer. By default, it is the same as the parallel of the input operator. |

Review Comment:
   Nit: I generally feel we should avoid coding specific terms like `null` in these kinds of docs (it makes sense for specs or data types because `null` is a necessary concept). The default just needs to describe the default value or behavior in this case imo. Since the default is described as `the same as the parallel of the input operator to the write operator.` could we just put that in instead of null?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #7039: Flink: Expose write-parallelism in SQL Hints

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#issuecomment-1500043986

   cc @stevenzwu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #7039: Flink: Expose write-parallelism in SQL Hints

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu merged PR #7039:
URL: https://github.com/apache/iceberg/pull/7039


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #7039: Flink: Expose write-parallelism in SQL Hints

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#issuecomment-1504407063

   thanks @stevenzwu @amogh-jahagirdar 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7039: Flink: Expose write-parallelism in SQL Hints

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#discussion_r1161374565


##########
docs/flink-configuration.md:
##########
@@ -148,13 +148,14 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
 ...
 ```
 
-| Flink option           | Default                                    | Description                                                  |
-| ---------------------- | ------------------------------------------ | ------------------------------------------------------------ |
-| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc |
-| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes          |
-| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                  |
-| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
-| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode               |
-| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write      |
-| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
-| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write |
+| Flink option           | Default                                                              | Description                                                                                                                        |
+|------------------------|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default                                           | File format to use for this write operation; parquet, avro, or orc                                                                 |
+| target-file-size-bytes | As per table property                                                | Overrides this table's write.target-file-size-bytes                                                                                |
+| upsert-enabled         | Table write.upsert.enabled                                           | Overrides this table's write.upsert.enabled                                                                                        |
+| overwrite-enabled      | false                                                                | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream.                         |
+| distribution-mode      | Table write.distribution-mode                                        | Overrides this table's write.distribution-mode                                                                                     |
+| compression-codec      | Table write.(fileformat).compression-codec                           | Overrides this table's compression codec for this write                                                                            |
+| compression-level      | Table write.(fileformat).compression-level                           | Overrides this table's compression level for Parquet and Avro tables for this write                                                |
+| compression-strategy   | Table write.orc.compression-strategy                                 | Overrides this table's compression strategy for ORC tables for this write                                                          |
+| write-parallelism      | The same as the parallel of the input operator to the write operator | Configuring the write parallel number for iceberg stream writer. By default, it is the same as the parallel of the input operator. |

Review Comment:
   The same as the parallel of the input operator to the write operator
   -->
   Upstream operator parallelism
   
   Configuring the write parallel number for iceberg stream writer. By default, it is the same as the parallel of the input operator.
   -->
   Overrides the writer parallelism



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);

Review Comment:
   why can't we reuse the table created in the `before` method?



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);
+
+      PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
+      String insertSQL =
+          String.format(
+              "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
+              tableName, SOURCE_TABLE);
+      ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
+      Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
+      Transformation<?> committer = dummySink.getInputs().get(0);
+      Transformation<?> writer = committer.getInputs().get(0);
+
+      Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism());
+
+      writer

Review Comment:
   why we are not doing `get(0)` for the upstream source operator?



##########
docs/flink-configuration.md:
##########
@@ -148,13 +148,14 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
 ...
 ```
 
-| Flink option           | Default                                    | Description                                                  |
-| ---------------------- | ------------------------------------------ | ------------------------------------------------------------ |
-| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc |
-| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes          |
-| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                  |
-| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
-| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode               |
-| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write      |
-| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
-| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write |
+| Flink option           | Default                                                              | Description                                                                                                                        |
+|------------------------|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default                                           | File format to use for this write operation; parquet, avro, or orc                                                                 |
+| target-file-size-bytes | As per table property                                                | Overrides this table's write.target-file-size-bytes                                                                                |
+| upsert-enabled         | Table write.upsert.enabled                                           | Overrides this table's write.upsert.enabled                                                                                        |
+| overwrite-enabled      | false                                                                | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream.                         |
+| distribution-mode      | Table write.distribution-mode                                        | Overrides this table's write.distribution-mode                                                                                     |
+| compression-codec      | Table write.(fileformat).compression-codec                           | Overrides this table's compression codec for this write                                                                            |
+| compression-level      | Table write.(fileformat).compression-level                           | Overrides this table's compression level for Parquet and Avro tables for this write                                                |
+| compression-strategy   | Table write.orc.compression-strategy                                 | Overrides this table's compression strategy for ORC tables for this write                                                          |
+| write-parallelism      | The same as the parallel of the input operator to the write operator | Configuring the write parallel number for iceberg stream writer. By default, it is the same as the parallel of the input operator. |

Review Comment:
   Then maybe we can revert the table width change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7039: Flink: Expose write-parallelism in SQL Hints

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#discussion_r1133156763


##########
docs/flink-getting-started.md:
##########
@@ -860,16 +860,17 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
 ...
 ```
 
-| Flink option           | Default                                    | Description                                                                                                |
-|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------|
-| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc                                         |
-| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes                                                        |
-| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                                                                |
-| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
-| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode                                                             |
-| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write                                                    |
-| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write                        |
-| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write                                  |
+| Flink option           | Default                                    | Description                                                                                                                        |
+|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc                                                                 |
+| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes                                                                                |
+| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                                                                                        |
+| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream.                         |
+| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode                                                                                     |
+| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write                                                                            |
+| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write                                                |
+| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write                                                          |
+| write-parallelism      | null                                       | Configuring the write parallel number for iceberg stream writer. By default, it is the same as the parallel of the input operator. |

Review Comment:
   Nit: I generally feel we should avoid coding specific terms like `null` in these kinds of docs (it makes sense for specs or data types because `null` is a necessary concept). The default just needs to describe the default value or behavior in this case imo since that's what a user cares about. Since the default is described as `the same as the parallel of the input operator to the write operator.` could we just put that in instead of null?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7039: Flink: Expose write-parallelism in SQL Hints

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#discussion_r1136486706


##########
docs/flink-getting-started.md:
##########
@@ -860,16 +860,17 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
 ...
 ```
 
-| Flink option           | Default                                    | Description                                                                                                |
-|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------|
-| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc                                         |
-| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes                                                        |
-| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                                                                |
-| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
-| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode                                                             |
-| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write                                                    |
-| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write                        |
-| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write                                  |
+| Flink option           | Default                                    | Description                                                                                                                        |
+|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc                                                                 |
+| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes                                                                                |
+| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                                                                                        |
+| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream.                         |
+| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode                                                                                     |
+| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write                                                                            |
+| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write                                                |
+| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write                                                          |
+| write-parallelism      | null                                       | Configuring the write parallel number for iceberg stream writer. By default, it is the same as the parallel of the input operator. |

Review Comment:
   Reasonable, modified. Thx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7039: Flink: Expose write-parallelism in SQL Hints

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#discussion_r1162407139


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);

Review Comment:
   modified.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);
+
+      PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
+      String insertSQL =
+          String.format(
+              "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
+              tableName, SOURCE_TABLE);
+      ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
+      Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
+      Transformation<?> committer = dummySink.getInputs().get(0);
+      Transformation<?> writer = committer.getInputs().get(0);
+
+      Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism());
+
+      writer

Review Comment:
   The reason `get(0)` is not used here is to express that all upstream sources will be the default parallelism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7039: Flink: Expose write-parallelism in SQL Hints

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#discussion_r1163014239


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);
+
+      PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
+      String insertSQL =
+          String.format(
+              "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
+              tableName, SOURCE_TABLE);
+      ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
+      Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
+      Transformation<?> committer = dummySink.getInputs().get(0);
+      Transformation<?> writer = committer.getInputs().get(0);
+
+      Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism());
+
+      writer

Review Comment:
   but here we only have one source. anyway, this is fine too. will merge



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org